You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/13 08:47:07 UTC

[1/3] kylin git commit: APACHE-KYLIN-2867: split large fuzzy key set

Repository: kylin
Updated Branches:
  refs/heads/master 029880f23 -> 79374047d


APACHE-KYLIN-2867: split large fuzzy key set

Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cb7f567f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cb7f567f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cb7f567f

Branch: refs/heads/master
Commit: cb7f567f43ea0b1466772c61fa5855f406cce661
Parents: 029880f
Author: Zhong <nj...@apache.org>
Authored: Mon Sep 18 14:53:49 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 13 14:23:17 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 ++
 .../storage/gtrecord/CubeScanRangePlanner.java  | 49 ++++++++++++++++----
 2 files changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cb7f567f/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 66805df..914ba7e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -895,6 +895,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(this.getOptional("kylin.storage.hbase.max-fuzzykey-scan", "200"));
     }
 
+    public int getQueryScanFuzzyKeySplitMax() {
+        return Integer.parseInt(this.getOptional("kylin.storage.hbase.max-fuzzykey-scan-split", "1"));
+    }
+
     public int getQueryStorageVisitScanRangeMax() {
         return Integer.valueOf(this.getOptional("kylin.storage.hbase.max-visit-scanrange", "1000000"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cb7f567f/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 7e6f7c4..ef1114a 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -63,6 +63,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
     private static final Logger logger = LoggerFactory.getLogger(CubeScanRangePlanner.class);
 
     protected int maxScanRanges;
+    protected int maxFuzzyKeysPerSplit;
     protected int maxFuzzyKeys;
 
     //non-GT
@@ -77,7 +78,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.context = context;
 
         this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
-        this.maxFuzzyKeys = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
+        this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
+        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
 
         this.cubeSegment = cubeSegment;
         this.cubeDesc = cubeSegment.getCubeDesc();
@@ -124,7 +126,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
     public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
 
         this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
-        this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+        this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
 
         this.gtInfo = info;
 
@@ -172,6 +175,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         }
 
         List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
+        mergedRanges = splitFuzzyKeys(mergedRanges);
         mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges);
 
         return mergedRanges;
@@ -196,8 +200,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         GTRecord pkEnd = new GTRecord(gtInfo);
         Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
 
-        List<GTRecord> fuzzyKeys;
-
         for (ColumnRange range : andDimRanges) {
             if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
                 int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond());
@@ -224,9 +226,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
             }
         }
 
-        fuzzyKeys =
+        List<GTRecord> fuzzyKeys = buildFuzzyKeys(fuzzyValues);
 
-                buildFuzzyKeys(fuzzyValues);
         return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
     }
 
@@ -243,7 +244,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         }
 
         List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, maxFuzzyKeys);
-
         for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
 
             //            BitSet bitSet = new BitSet(gtInfo.getColumnCount());
@@ -309,7 +309,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
 
         GTRecord start = first.pkStart;
         GTRecord end = first.pkEnd;
-        List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>();
+        Set<GTRecord> newFuzzyKeys = Sets.newLinkedHashSet();
 
         boolean hasNonFuzzyRange = false;
         for (GTScanRange range : ranges) {
@@ -319,12 +319,15 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         }
 
         // if any range is non-fuzzy, then all fuzzy keys must be cleared
-        // also too many fuzzy keys will slow down HBase scan
+        // too many fuzzy keys will slow down HBase scan
         if (hasNonFuzzyRange || newFuzzyKeys.size() > maxFuzzyKeys) {
+            if (newFuzzyKeys.size() > maxFuzzyKeys) {
+                logger.debug("too many FuzzyKeys,  clean it!");
+            }
             newFuzzyKeys.clear();
         }
 
-        return new GTScanRange(start, end, newFuzzyKeys);
+        return new GTScanRange(start, end, Lists.newArrayList(newFuzzyKeys));
     }
 
     protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
@@ -336,6 +339,32 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         List<GTScanRange> result = new ArrayList<GTScanRange>(1);
         GTScanRange mergedRange = mergeKeyRange(ranges);
         result.add(mergedRange);
+
+        result = splitFuzzyKeys(result);
+        return result;
+    }
+
+    private List<GTScanRange> splitFuzzyKeys(List<GTScanRange> mergedRanges) {
+        List<GTScanRange> result = Lists.newArrayList();
+        for (GTScanRange range : mergedRanges) {
+            // if the fuzzy key is huge but still within in split range, then we split fuzzy keys to multiple ones.
+            if (range.fuzzyKeys.size() > maxFuzzyKeysPerSplit && range.fuzzyKeys.size() <= maxFuzzyKeys) {
+                List<GTRecord> fuzzyKeys = range.fuzzyKeys;
+                Collections.sort(fuzzyKeys);
+                int nSplit = (fuzzyKeys.size() - 1) / maxFuzzyKeysPerSplit + 1;
+                int nFuzzyKeysPerSplit = fuzzyKeys.size() / nSplit;
+                int startIndex = 0;
+                for (int i = 1; i <= nSplit; i++) {
+                    int endIndex = i == nSplit ? fuzzyKeys.size() : i * nFuzzyKeysPerSplit;
+                    List<GTRecord> subFuzzyKeys = fuzzyKeys.subList(startIndex, endIndex);
+                    result.add(new GTScanRange(range.pkStart, range.pkEnd, subFuzzyKeys));
+                    startIndex = endIndex;
+                }
+                logger.debug("large FuzzyKeys split size : " + result.size());
+            } else {
+                result.add(range);
+            }
+        }
         return result;
     }
 


[2/3] kylin git commit: APACHE-KYLIN-2736: Use multiple threads to calculate HyperLogLogPlusCounter in FactDistinctColumnsMapper

Posted by li...@apache.org.
APACHE-KYLIN-2736: Use multiple threads to calculate HyperLogLogPlusCounter in FactDistinctColumnsMapper

Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fd59b513
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fd59b513
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fd59b513

Branch: refs/heads/master
Commit: fd59b513c7118e8c4e273ee2ed8b1be467490cc7
Parents: cb7f567
Author: Ma Gang <mg...@163.com>
Authored: Tue Sep 12 18:12:16 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 13 15:48:17 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   9 +
 .../mr/steps/FactDistinctColumnsMapper.java     | 278 ++++++++++++++-----
 2 files changed, 215 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/fd59b513/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 914ba7e..4ccf1d1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1027,6 +1027,15 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
     }
 
+    public int getCuboidStatisticsCalculatorMaxNumber() {
+        // default multi-thread statistics calculation is disabled
+        return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
+    }
+
+    public int getCuboidNumberPerStatisticsCalculator() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
+    }
+
     //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
     public int getUHCReducerCount() {
         return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5"));

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd59b513/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index ace16a5..272894f 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -20,8 +20,11 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinVersion;
@@ -55,18 +58,17 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         BYTES
     }
 
-
     protected boolean collectStatistics = false;
     protected int nRowKey;
     private Integer[][] allCuboidsBitSet = null;
     private HLLCounter[] allCuboidsHLL = null;
     private Long[] cuboidIds;
-    private HashFunction hf = null;
     private int rowCount = 0;
     private int samplingPercentage;
-    //private ByteArray[] row_hashcodes = null;
-    private long[] rowHashCodesLong = null;
     private ByteBuffer tmpbuf;
+
+    private CuboidStatCalculator[] cuboidStatCalculators;
+
     private static final Text EMPTY_TEXT = new Text();
     public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
     public static final byte MARK_FOR_HLL = (byte) 0xFF;
@@ -76,9 +78,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
     private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
 
-    //about details of the new algorithm, please see KYLIN-2518
-    private boolean isUsePutRowKeyToHllNewAlgorithm;
-
     @Override
     protected void doSetup(Context context) throws IOException {
         super.doSetup(context);
@@ -116,18 +115,60 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 needFetchPartitionCol = true;
             }
             //for KYLIN-2518 backward compatibility
+            boolean isUsePutRowKeyToHllNewAlgorithm;
             if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
                 isUsePutRowKeyToHllNewAlgorithm = false;
-                hf = Hashing.murmur3_32();
                 logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
             } else {
                 isUsePutRowKeyToHllNewAlgorithm = true;
-                rowHashCodesLong = new long[nRowKey];
-                hf = Hashing.murmur3_128();
                 logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion());
             }
+
+            int calculatorNum = getStatsThreadNum(cuboidIds.length);
+            cuboidStatCalculators = new CuboidStatCalculator[calculatorNum];
+            int splitSize = cuboidIds.length / calculatorNum;
+            if (splitSize <= 0) {
+                splitSize = 1;
+            }
+            for (int i = 0; i < calculatorNum; i++) {
+                HLLCounter[] cuboidsHLLSplit;
+                Integer[][] cuboidsBitSetSplit;
+                Long[] cuboidIdSplit;
+                int start = i * splitSize;
+                if (start > cuboidIds.length) {
+                    break;
+                }
+                int end = (i + 1) * splitSize;
+                if (i == calculatorNum - 1) {// last split
+                    end = cuboidIds.length;
+                }
+
+                cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end);
+                cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end);
+                cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end);
+                CuboidStatCalculator calculator = new CuboidStatCalculator(i,
+                        intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit,
+                        isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit);
+                cuboidStatCalculators[i] = calculator;
+                calculator.start();
+            }
         }
+    }
 
+    private int getStatsThreadNum(int cuboidNum) {
+        int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatisticsCalculator();
+        if (unitNum <= 0) {
+            logger.warn("config from getCuboidNumberPerStatisticsCalculator() " + unitNum + " is should larger than 0");
+            logger.info("Will use single thread for cuboid statistics calculation");
+            return 1;
+        }
+
+        int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatisticsCalculatorMaxNumber();
+        int calculatorNum = (cuboidNum - 1) / unitNum + 1;
+        if (calculatorNum > maxCalculatorNum) {
+            calculatorNum = maxCalculatorNum;
+        }
+        return calculatorNum;
     }
 
     @Override
@@ -172,11 +213,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
             if (collectStatistics) {
                 if (rowCount % 100 < samplingPercentage) {
-                    if (isUsePutRowKeyToHllNewAlgorithm) {
-                        putRowKeyToHLLNew(row);
-                    } else {
-                        putRowKeyToHLLOld(row);
-                    }
+                    putRowKeyToHLL(row);
                 }
 
                 if (needFetchPartitionCol == true) {
@@ -200,6 +237,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         }
     }
 
+    private void putRowKeyToHLL(String[] row) {
+        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+            cuboidStatCalculator.putRow(row);
+        }
+    }
+
     private long countSizeInBytes(String[] row) {
         int size = 0;
         for (String s : row) {
@@ -209,80 +252,171 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         return size;
     }
 
-    private void putRowKeyToHLLOld(String[] row) {
-        //generate hash for each row key column
-        byte[][] rowHashCodes = new byte[nRowKey][];
-        for (int i = 0; i < nRowKey; i++) {
-            Hasher hc = hf.newHasher();
-            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
-            if (colValue != null) {
-                rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
-            } else {
-                rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        if (collectStatistics) {
+            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+                cuboidStatCalculator.waitComplete();
+            }
+            for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+                Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+                HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+                HLLCounter hll;
+
+                for (int i = 0; i < cuboidIds.length; i++) {
+                    hll = cuboidsHLL[i];
+                    tmpbuf.clear();
+                    tmpbuf.put(MARK_FOR_HLL); // one byte
+                    tmpbuf.putLong(cuboidIds[i]);
+                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                    hllBuf.clear();
+                    hll.writeRegisters(hllBuf);
+                    outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                    sortableKey.init(outputKey, (byte) 0);
+                    context.write(sortableKey, outputValue);
+                }
             }
         }
+    }
 
-        // user the row key column hash to get a consolidated hash for each cuboid
-        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-            Hasher hc = hf.newHasher();
-            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]);
+    private int countNewSize(int oldSize, int dataSize) {
+        int newSize = oldSize * 2;
+        while (newSize < dataSize) {
+            newSize = newSize * 2;
+        }
+        return newSize;
+    }
+
+    public static class CuboidStatCalculator implements Runnable {
+        private final int id;
+        private final int nRowKey;
+        private final int[] rowkeyColIndex;
+        private final Long[] cuboidIds;
+        private final Integer[][] cuboidsBitSet;
+        private volatile HLLCounter[] cuboidsHLL = null;
+
+        //about details of the new algorithm, please see KYLIN-2518
+        private final boolean isNewAlgorithm;
+        private final HashFunction hf;
+        private long[] rowHashCodesLong;
+
+        private BlockingQueue<String[]> queue = new LinkedBlockingQueue<String[]>(2000);
+        private Thread workThread;
+        private volatile boolean stop;
+
+        public CuboidStatCalculator(int id, int[] rowkeyColIndex, Long[] cuboidIds, Integer[][] cuboidsBitSet,
+                boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] cuboidsHLL) {
+            this.id = id;
+            this.nRowKey = rowkeyColIndex.length;
+            this.rowkeyColIndex = rowkeyColIndex;
+            this.cuboidIds = cuboidIds;
+            this.cuboidsBitSet = cuboidsBitSet;
+            this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+            if (!isNewAlgorithm) {
+                this.hf = Hashing.murmur3_32();
+            } else {
+                rowHashCodesLong = new long[nRowKey];
+                this.hf = Hashing.murmur3_128();
             }
+            this.cuboidsHLL = cuboidsHLL;
+            workThread = new Thread(this);
+        }
 
-            allCuboidsHLL[i].add(hc.hash().asBytes());
+        public void start() {
+            logger.info("cuboid stats calculator:" + id + " started, handle cuboids number:" + cuboidIds.length);
+            workThread.start();
         }
-    }
 
-    private void putRowKeyToHLLNew(String[] row) {
-        //generate hash for each row key column
-        for (int i = 0; i < nRowKey; i++) {
-            Hasher hc = hf.newHasher();
-            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
-            if (colValue == null)
-                colValue = "0";
-            byte[] bytes = hc.putString(colValue).hash().asBytes();
-            rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+        public void putRow(final String[] row) {
+            String[] copyRow = Arrays.copyOf(row, row.length);
+            try {
+                queue.put(copyRow);
+            } catch (InterruptedException e) {
+                logger.error("interrupt", e);
+            }
         }
 
-        // user the row key column hash to get a consolidated hash for each cuboid
-        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-            long value = 0;
-            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                value += rowHashCodesLong[allCuboidsBitSet[i][position]];
+        public void waitComplete() {
+            stop = true;
+            try {
+                workThread.join();
+            } catch (InterruptedException e) {
+                logger.error("interrupt", e);
             }
-            allCuboidsHLL[i].addHashDirectly(value);
         }
-    }
 
-    @Override
-    protected void doCleanup(Context context) throws IOException, InterruptedException {
-        if (collectStatistics) {
-            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-            // output each cuboid's hll to reducer, key is 0 - cuboidId
-            HLLCounter hll;
-            for (int i = 0; i < cuboidIds.length; i++) {
-                hll = allCuboidsHLL[i];
+        private void putRowKeyToHLLOld(String[] row) {
+            //generate hash for each row key column
+            byte[][] rowHashCodes = new byte[nRowKey][];
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue != null) {
+                    rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+                } else {
+                    rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+                }
+            }
 
-                tmpbuf.clear();
-                tmpbuf.put(MARK_FOR_HLL); // one byte
-                tmpbuf.putLong(cuboidIds[i]);
-                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                hllBuf.clear();
-                hll.writeRegisters(hllBuf);
-                outputValue.set(hllBuf.array(), 0, hllBuf.position());
-                sortableKey.init(outputKey, (byte) 0);
-                context.write(sortableKey, outputValue);
+            // user the row key column hash to get a consolidated hash for each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                Hasher hc = hf.newHasher();
+                for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+                    hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+                }
+
+                cuboidsHLL[i].add(hc.hash().asBytes());
             }
         }
-    }
 
+        private void putRowKeyToHLLNew(String[] row) {
+            //generate hash for each row key column
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue == null)
+                    colValue = "0";
+                byte[] bytes = hc.putString(colValue).hash().asBytes();
+                rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+            }
+
+            // user the row key column hash to get a consolidated hash for each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                long value = 0;
+                for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+                    value += rowHashCodesLong[cuboidsBitSet[i][position]];
+                }
+                cuboidsHLL[i].addHashDirectly(value);
+            }
+        }
 
-    private int countNewSize(int oldSize, int dataSize) {
-        int newSize = oldSize * 2;
-        while (newSize < dataSize) {
-            newSize = newSize * 2;
+        public HLLCounter[] getHLLCounters() {
+            return cuboidsHLL;
         }
-        return newSize;
-    }
 
+        public Long[] getCuboidIds() {
+            return cuboidIds;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                String[] row = queue.poll();
+                if (row == null && stop) {
+                    logger.info("cuboid stats calculator:" + id + " completed.");
+                    break;
+                } else if (row == null) {
+                    Thread.yield();
+                    continue;
+                }
+                if (isNewAlgorithm) {
+                    putRowKeyToHLLNew(row);
+                } else {
+                    putRowKeyToHLLOld(row);
+                }
+            }
+        }
+    }
 }


[3/3] kylin git commit: KYLIN-2736 code refine

Posted by li...@apache.org.
KYLIN-2736 code refine


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/79374047
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/79374047
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/79374047

Branch: refs/heads/master
Commit: 79374047d3bcd8468bee9ac56f68f4f191edad7b
Parents: fd59b51
Author: lidongsjtu <li...@apache.org>
Authored: Wed Dec 13 16:44:44 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 13 16:44:44 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  6 ++--
 .../mr/steps/FactDistinctColumnsMapper.java     | 30 +++++++++++---------
 2 files changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/79374047/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4ccf1d1..d59c98d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1027,12 +1027,12 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
     }
 
-    public int getCuboidStatisticsCalculatorMaxNumber() {
-        // default multi-thread statistics calculation is disabled
+    public int getCuboidStatsCalculatorMaxNumber() {
+        // set 1 to disable multi-thread statistics calculation
         return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
     }
 
-    public int getCuboidNumberPerStatisticsCalculator() {
+    public int getCuboidNumberPerStatsCalculator() {
         return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/79374047/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 272894f..575c7b3 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -46,8 +46,6 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 
-
-
 /**
  */
 public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
@@ -84,7 +82,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         tmpbuf = ByteBuffer.allocate(4096);
         collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
         if (collectStatistics) {
-            samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+            samplingPercentage = Integer
+                    .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
             nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
 
             Set<Long> cuboidIdSet = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
@@ -101,7 +100,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
             }
 
-
             TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
             if (partitionColRef != null) {
                 partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
@@ -121,7 +119,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
             } else {
                 isUsePutRowKeyToHllNewAlgorithm = true;
-                logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion());
+                logger.info(
+                        "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+                        cubeDesc.getVersion());
             }
 
             int calculatorNum = getStatsThreadNum(cuboidIds.length);
@@ -135,7 +135,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 Integer[][] cuboidsBitSetSplit;
                 Long[] cuboidIdSplit;
                 int start = i * splitSize;
-                if (start > cuboidIds.length) {
+                if (start >= cuboidIds.length) {
                     break;
                 }
                 int end = (i + 1) * splitSize;
@@ -156,14 +156,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
     }
 
     private int getStatsThreadNum(int cuboidNum) {
-        int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatisticsCalculator();
+        int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatsCalculator();
         if (unitNum <= 0) {
-            logger.warn("config from getCuboidNumberPerStatisticsCalculator() " + unitNum + " is should larger than 0");
+            logger.warn("config from getCuboidNumberPerStatsCalculator() " + unitNum + " is should larger than 0");
             logger.info("Will use single thread for cuboid statistics calculation");
             return 1;
         }
 
-        int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatisticsCalculatorMaxNumber();
+        int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatsCalculatorMaxNumber();
         int calculatorNum = (cuboidNum - 1) / unitNum + 1;
         if (calculatorNum > maxCalculatorNum) {
             calculatorNum = maxCalculatorNum;
@@ -175,7 +175,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
     public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
         Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
 
-        for (String[] row: rowCollection) {
+        for (String[] row : rowCollection) {
             context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
             for (int i = 0; i < dictCols.size(); i++) {
                 String fieldValue = row[dictionaryColumnIndex[i]];
@@ -188,7 +188,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                     reducerIndex = columnIndexToReducerBeginId.get(i);
                 } else {
                     //for the uhc
-                    reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+                    reducerIndex = columnIndexToReducerBeginId.get(i)
+                            + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
                 }
 
                 tmpbuf.clear();
@@ -207,7 +208,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
                 // log a few rows for troubleshooting
                 if (rowCount < 10) {
-                    logger.info("Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+                    logger.info(
+                            "Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
                 }
             }
 
@@ -258,7 +260,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
             // output each cuboid's hll to reducer, key is 0 - cuboidId
             for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
-                cuboidStatCalculator.waitComplete();
+                cuboidStatCalculator.waitForCompletion();
             }
             for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
                 Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
@@ -338,7 +340,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             }
         }
 
-        public void waitComplete() {
+        public void waitForCompletion() {
             stop = true;
             try {
                 workThread.join();