You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/13 08:47:08 UTC

[2/3] kylin git commit: APACHE-KYLIN-2736: Use multiple threads to calculate HyperLogLogPlusCounter in FactDistinctColumnsMapper

APACHE-KYLIN-2736: Use multiple threads to calculate HyperLogLogPlusCounter in FactDistinctColumnsMapper

Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fd59b513
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fd59b513
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fd59b513

Branch: refs/heads/master
Commit: fd59b513c7118e8c4e273ee2ed8b1be467490cc7
Parents: cb7f567
Author: Ma Gang <mg...@163.com>
Authored: Tue Sep 12 18:12:16 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 13 15:48:17 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   9 +
 .../mr/steps/FactDistinctColumnsMapper.java     | 278 ++++++++++++++-----
 2 files changed, 215 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/fd59b513/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 914ba7e..4ccf1d1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1027,6 +1027,15 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
     }
 
+    public int getCuboidStatisticsCalculatorMaxNumber() {
+        // default multi-thread statistics calculation is disabled
+        return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
+    }
+
+    public int getCuboidNumberPerStatisticsCalculator() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
+    }
+
     //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
     public int getUHCReducerCount() {
         return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5"));

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd59b513/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index ace16a5..272894f 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -20,8 +20,11 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinVersion;
@@ -55,18 +58,17 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         BYTES
     }
 
-
     protected boolean collectStatistics = false;
     protected int nRowKey;
     private Integer[][] allCuboidsBitSet = null;
     private HLLCounter[] allCuboidsHLL = null;
     private Long[] cuboidIds;
-    private HashFunction hf = null;
     private int rowCount = 0;
     private int samplingPercentage;
-    //private ByteArray[] row_hashcodes = null;
-    private long[] rowHashCodesLong = null;
     private ByteBuffer tmpbuf;
+
+    private CuboidStatCalculator[] cuboidStatCalculators;
+
     private static final Text EMPTY_TEXT = new Text();
     public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
     public static final byte MARK_FOR_HLL = (byte) 0xFF;
@@ -76,9 +78,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
     private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
 
-    //about details of the new algorithm, please see KYLIN-2518
-    private boolean isUsePutRowKeyToHllNewAlgorithm;
-
     @Override
     protected void doSetup(Context context) throws IOException {
         super.doSetup(context);
@@ -116,18 +115,60 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 needFetchPartitionCol = true;
             }
             //for KYLIN-2518 backward compatibility
+            boolean isUsePutRowKeyToHllNewAlgorithm;
             if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
                 isUsePutRowKeyToHllNewAlgorithm = false;
-                hf = Hashing.murmur3_32();
                 logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
             } else {
                 isUsePutRowKeyToHllNewAlgorithm = true;
-                rowHashCodesLong = new long[nRowKey];
-                hf = Hashing.murmur3_128();
                 logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion());
             }
+
+            int calculatorNum = getStatsThreadNum(cuboidIds.length);
+            cuboidStatCalculators = new CuboidStatCalculator[calculatorNum];
+            int splitSize = cuboidIds.length / calculatorNum;
+            if (splitSize <= 0) {
+                splitSize = 1;
+            }
+            for (int i = 0; i < calculatorNum; i++) {
+                HLLCounter[] cuboidsHLLSplit;
+                Integer[][] cuboidsBitSetSplit;
+                Long[] cuboidIdSplit;
+                int start = i * splitSize;
+                if (start > cuboidIds.length) {
+                    break;
+                }
+                int end = (i + 1) * splitSize;
+                if (i == calculatorNum - 1) {// last split
+                    end = cuboidIds.length;
+                }
+
+                cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end);
+                cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end);
+                cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end);
+                CuboidStatCalculator calculator = new CuboidStatCalculator(i,
+                        intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit,
+                        isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit);
+                cuboidStatCalculators[i] = calculator;
+                calculator.start();
+            }
         }
+    }
 
+    private int getStatsThreadNum(int cuboidNum) {
+        int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatisticsCalculator();
+        if (unitNum <= 0) {
+            logger.warn("config from getCuboidNumberPerStatisticsCalculator() " + unitNum + " is should larger than 0");
+            logger.info("Will use single thread for cuboid statistics calculation");
+            return 1;
+        }
+
+        int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatisticsCalculatorMaxNumber();
+        int calculatorNum = (cuboidNum - 1) / unitNum + 1;
+        if (calculatorNum > maxCalculatorNum) {
+            calculatorNum = maxCalculatorNum;
+        }
+        return calculatorNum;
     }
 
     @Override
@@ -172,11 +213,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
             if (collectStatistics) {
                 if (rowCount % 100 < samplingPercentage) {
-                    if (isUsePutRowKeyToHllNewAlgorithm) {
-                        putRowKeyToHLLNew(row);
-                    } else {
-                        putRowKeyToHLLOld(row);
-                    }
+                    putRowKeyToHLL(row);
                 }
 
                 if (needFetchPartitionCol == true) {
@@ -200,6 +237,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         }
     }
 
+    private void putRowKeyToHLL(String[] row) {
+        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+            cuboidStatCalculator.putRow(row);
+        }
+    }
+
     private long countSizeInBytes(String[] row) {
         int size = 0;
         for (String s : row) {
@@ -209,80 +252,171 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         return size;
     }
 
-    private void putRowKeyToHLLOld(String[] row) {
-        //generate hash for each row key column
-        byte[][] rowHashCodes = new byte[nRowKey][];
-        for (int i = 0; i < nRowKey; i++) {
-            Hasher hc = hf.newHasher();
-            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
-            if (colValue != null) {
-                rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
-            } else {
-                rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        if (collectStatistics) {
+            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+                cuboidStatCalculator.waitComplete();
+            }
+            for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+                Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+                HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+                HLLCounter hll;
+
+                for (int i = 0; i < cuboidIds.length; i++) {
+                    hll = cuboidsHLL[i];
+                    tmpbuf.clear();
+                    tmpbuf.put(MARK_FOR_HLL); // one byte
+                    tmpbuf.putLong(cuboidIds[i]);
+                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                    hllBuf.clear();
+                    hll.writeRegisters(hllBuf);
+                    outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                    sortableKey.init(outputKey, (byte) 0);
+                    context.write(sortableKey, outputValue);
+                }
             }
         }
+    }
 
-        // user the row key column hash to get a consolidated hash for each cuboid
-        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-            Hasher hc = hf.newHasher();
-            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]);
+    private int countNewSize(int oldSize, int dataSize) {
+        int newSize = oldSize * 2;
+        while (newSize < dataSize) {
+            newSize = newSize * 2;
+        }
+        return newSize;
+    }
+
+    public static class CuboidStatCalculator implements Runnable {
+        private final int id;
+        private final int nRowKey;
+        private final int[] rowkeyColIndex;
+        private final Long[] cuboidIds;
+        private final Integer[][] cuboidsBitSet;
+        private volatile HLLCounter[] cuboidsHLL = null;
+
+        //about details of the new algorithm, please see KYLIN-2518
+        private final boolean isNewAlgorithm;
+        private final HashFunction hf;
+        private long[] rowHashCodesLong;
+
+        private BlockingQueue<String[]> queue = new LinkedBlockingQueue<String[]>(2000);
+        private Thread workThread;
+        private volatile boolean stop;
+
+        public CuboidStatCalculator(int id, int[] rowkeyColIndex, Long[] cuboidIds, Integer[][] cuboidsBitSet,
+                boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] cuboidsHLL) {
+            this.id = id;
+            this.nRowKey = rowkeyColIndex.length;
+            this.rowkeyColIndex = rowkeyColIndex;
+            this.cuboidIds = cuboidIds;
+            this.cuboidsBitSet = cuboidsBitSet;
+            this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+            if (!isNewAlgorithm) {
+                this.hf = Hashing.murmur3_32();
+            } else {
+                rowHashCodesLong = new long[nRowKey];
+                this.hf = Hashing.murmur3_128();
             }
+            this.cuboidsHLL = cuboidsHLL;
+            workThread = new Thread(this);
+        }
 
-            allCuboidsHLL[i].add(hc.hash().asBytes());
+        public void start() {
+            logger.info("cuboid stats calculator:" + id + " started, handle cuboids number:" + cuboidIds.length);
+            workThread.start();
         }
-    }
 
-    private void putRowKeyToHLLNew(String[] row) {
-        //generate hash for each row key column
-        for (int i = 0; i < nRowKey; i++) {
-            Hasher hc = hf.newHasher();
-            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
-            if (colValue == null)
-                colValue = "0";
-            byte[] bytes = hc.putString(colValue).hash().asBytes();
-            rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+        public void putRow(final String[] row) {
+            String[] copyRow = Arrays.copyOf(row, row.length);
+            try {
+                queue.put(copyRow);
+            } catch (InterruptedException e) {
+                logger.error("interrupt", e);
+            }
         }
 
-        // user the row key column hash to get a consolidated hash for each cuboid
-        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-            long value = 0;
-            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                value += rowHashCodesLong[allCuboidsBitSet[i][position]];
+        public void waitComplete() {
+            stop = true;
+            try {
+                workThread.join();
+            } catch (InterruptedException e) {
+                logger.error("interrupt", e);
             }
-            allCuboidsHLL[i].addHashDirectly(value);
         }
-    }
 
-    @Override
-    protected void doCleanup(Context context) throws IOException, InterruptedException {
-        if (collectStatistics) {
-            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-            // output each cuboid's hll to reducer, key is 0 - cuboidId
-            HLLCounter hll;
-            for (int i = 0; i < cuboidIds.length; i++) {
-                hll = allCuboidsHLL[i];
+        private void putRowKeyToHLLOld(String[] row) {
+            //generate hash for each row key column
+            byte[][] rowHashCodes = new byte[nRowKey][];
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue != null) {
+                    rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+                } else {
+                    rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+                }
+            }
 
-                tmpbuf.clear();
-                tmpbuf.put(MARK_FOR_HLL); // one byte
-                tmpbuf.putLong(cuboidIds[i]);
-                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                hllBuf.clear();
-                hll.writeRegisters(hllBuf);
-                outputValue.set(hllBuf.array(), 0, hllBuf.position());
-                sortableKey.init(outputKey, (byte) 0);
-                context.write(sortableKey, outputValue);
+            // user the row key column hash to get a consolidated hash for each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                Hasher hc = hf.newHasher();
+                for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+                    hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+                }
+
+                cuboidsHLL[i].add(hc.hash().asBytes());
             }
         }
-    }
 
+        private void putRowKeyToHLLNew(String[] row) {
+            //generate hash for each row key column
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue == null)
+                    colValue = "0";
+                byte[] bytes = hc.putString(colValue).hash().asBytes();
+                rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+            }
+
+            // user the row key column hash to get a consolidated hash for each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                long value = 0;
+                for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+                    value += rowHashCodesLong[cuboidsBitSet[i][position]];
+                }
+                cuboidsHLL[i].addHashDirectly(value);
+            }
+        }
 
-    private int countNewSize(int oldSize, int dataSize) {
-        int newSize = oldSize * 2;
-        while (newSize < dataSize) {
-            newSize = newSize * 2;
+        public HLLCounter[] getHLLCounters() {
+            return cuboidsHLL;
         }
-        return newSize;
-    }
 
+        public Long[] getCuboidIds() {
+            return cuboidIds;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                String[] row = queue.poll();
+                if (row == null && stop) {
+                    logger.info("cuboid stats calculator:" + id + " completed.");
+                    break;
+                } else if (row == null) {
+                    Thread.yield();
+                    continue;
+                }
+                if (isNewAlgorithm) {
+                    putRowKeyToHLLNew(row);
+                } else {
+                    putRowKeyToHLLOld(row);
+                }
+            }
+        }
+    }
 }