You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/13 08:47:07 UTC
[1/3] kylin git commit: APACHE-KYLIN-2867: split large fuzzy key set
Repository: kylin
Updated Branches:
refs/heads/master 029880f23 -> 79374047d
APACHE-KYLIN-2867: split large fuzzy key set
Signed-off-by: lidongsjtu <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cb7f567f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cb7f567f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cb7f567f
Branch: refs/heads/master
Commit: cb7f567f43ea0b1466772c61fa5855f406cce661
Parents: 029880f
Author: Zhong <nj...@apache.org>
Authored: Mon Sep 18 14:53:49 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 13 14:23:17 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 ++
.../storage/gtrecord/CubeScanRangePlanner.java | 49 ++++++++++++++++----
2 files changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/cb7f567f/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 66805df..914ba7e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -895,6 +895,10 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(this.getOptional("kylin.storage.hbase.max-fuzzykey-scan", "200"));
}
+ public int getQueryScanFuzzyKeySplitMax() {
+ return Integer.parseInt(this.getOptional("kylin.storage.hbase.max-fuzzykey-scan-split", "1"));
+ }
+
public int getQueryStorageVisitScanRangeMax() {
return Integer.valueOf(this.getOptional("kylin.storage.hbase.max-visit-scanrange", "1000000"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cb7f567f/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 7e6f7c4..ef1114a 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -63,6 +63,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
private static final Logger logger = LoggerFactory.getLogger(CubeScanRangePlanner.class);
protected int maxScanRanges;
+ protected int maxFuzzyKeysPerSplit;
protected int maxFuzzyKeys;
//non-GT
@@ -77,7 +78,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
this.context = context;
this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
- this.maxFuzzyKeys = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
+ this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
+ this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
this.cubeSegment = cubeSegment;
this.cubeDesc = cubeSegment.getCubeDesc();
@@ -124,7 +126,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
- this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+ this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+ this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
this.gtInfo = info;
@@ -172,6 +175,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
}
List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
+ mergedRanges = splitFuzzyKeys(mergedRanges);
mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges);
return mergedRanges;
@@ -196,8 +200,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
GTRecord pkEnd = new GTRecord(gtInfo);
Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
- List<GTRecord> fuzzyKeys;
-
for (ColumnRange range : andDimRanges) {
if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond());
@@ -224,9 +226,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
}
}
- fuzzyKeys =
+ List<GTRecord> fuzzyKeys = buildFuzzyKeys(fuzzyValues);
- buildFuzzyKeys(fuzzyValues);
return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
}
@@ -243,7 +244,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
}
List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, maxFuzzyKeys);
-
for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
// BitSet bitSet = new BitSet(gtInfo.getColumnCount());
@@ -309,7 +309,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
GTRecord start = first.pkStart;
GTRecord end = first.pkEnd;
- List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>();
+ Set<GTRecord> newFuzzyKeys = Sets.newLinkedHashSet();
boolean hasNonFuzzyRange = false;
for (GTScanRange range : ranges) {
@@ -319,12 +319,15 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
}
// if any range is non-fuzzy, then all fuzzy keys must be cleared
- // also too many fuzzy keys will slow down HBase scan
+ // too many fuzzy keys will slow down HBase scan
if (hasNonFuzzyRange || newFuzzyKeys.size() > maxFuzzyKeys) {
+ if (newFuzzyKeys.size() > maxFuzzyKeys) {
+ logger.debug("too many FuzzyKeys, clean it!");
+ }
newFuzzyKeys.clear();
}
- return new GTScanRange(start, end, newFuzzyKeys);
+ return new GTScanRange(start, end, Lists.newArrayList(newFuzzyKeys));
}
protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
@@ -336,6 +339,32 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
List<GTScanRange> result = new ArrayList<GTScanRange>(1);
GTScanRange mergedRange = mergeKeyRange(ranges);
result.add(mergedRange);
+
+ result = splitFuzzyKeys(result);
+ return result;
+ }
+
+ private List<GTScanRange> splitFuzzyKeys(List<GTScanRange> mergedRanges) {
+ List<GTScanRange> result = Lists.newArrayList();
+ for (GTScanRange range : mergedRanges) {
+ // if the fuzzy key is huge but still within in split range, then we split fuzzy keys to multiple ones.
+ if (range.fuzzyKeys.size() > maxFuzzyKeysPerSplit && range.fuzzyKeys.size() <= maxFuzzyKeys) {
+ List<GTRecord> fuzzyKeys = range.fuzzyKeys;
+ Collections.sort(fuzzyKeys);
+ int nSplit = (fuzzyKeys.size() - 1) / maxFuzzyKeysPerSplit + 1;
+ int nFuzzyKeysPerSplit = fuzzyKeys.size() / nSplit;
+ int startIndex = 0;
+ for (int i = 1; i <= nSplit; i++) {
+ int endIndex = i == nSplit ? fuzzyKeys.size() : i * nFuzzyKeysPerSplit;
+ List<GTRecord> subFuzzyKeys = fuzzyKeys.subList(startIndex, endIndex);
+ result.add(new GTScanRange(range.pkStart, range.pkEnd, subFuzzyKeys));
+ startIndex = endIndex;
+ }
+ logger.debug("large FuzzyKeys split size : " + result.size());
+ } else {
+ result.add(range);
+ }
+ }
return result;
}
[2/3] kylin git commit: APACHE-KYLIN-2736: Use multiple threads to
calculate HyperLogLogPlusCounter in FactDistinctColumnsMapper
Posted by li...@apache.org.
APACHE-KYLIN-2736: Use multiple threads to calculate HyperLogLogPlusCounter in FactDistinctColumnsMapper
Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fd59b513
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fd59b513
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fd59b513
Branch: refs/heads/master
Commit: fd59b513c7118e8c4e273ee2ed8b1be467490cc7
Parents: cb7f567
Author: Ma Gang <mg...@163.com>
Authored: Tue Sep 12 18:12:16 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 13 15:48:17 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 9 +
.../mr/steps/FactDistinctColumnsMapper.java | 278 ++++++++++++++-----
2 files changed, 215 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd59b513/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 914ba7e..4ccf1d1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1027,6 +1027,15 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
}
+ public int getCuboidStatisticsCalculatorMaxNumber() {
+ // default multi-thread statistics calculation is disabled
+ return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
+ }
+
+ public int getCuboidNumberPerStatisticsCalculator() {
+ return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
+ }
+
//UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
public int getUHCReducerCount() {
return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5"));
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd59b513/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index ace16a5..272894f 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -20,8 +20,11 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinVersion;
@@ -55,18 +58,17 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
BYTES
}
-
protected boolean collectStatistics = false;
protected int nRowKey;
private Integer[][] allCuboidsBitSet = null;
private HLLCounter[] allCuboidsHLL = null;
private Long[] cuboidIds;
- private HashFunction hf = null;
private int rowCount = 0;
private int samplingPercentage;
- //private ByteArray[] row_hashcodes = null;
- private long[] rowHashCodesLong = null;
private ByteBuffer tmpbuf;
+
+ private CuboidStatCalculator[] cuboidStatCalculators;
+
private static final Text EMPTY_TEXT = new Text();
public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
public static final byte MARK_FOR_HLL = (byte) 0xFF;
@@ -76,9 +78,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
- //about details of the new algorithm, please see KYLIN-2518
- private boolean isUsePutRowKeyToHllNewAlgorithm;
-
@Override
protected void doSetup(Context context) throws IOException {
super.doSetup(context);
@@ -116,18 +115,60 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
needFetchPartitionCol = true;
}
//for KYLIN-2518 backward compatibility
+ boolean isUsePutRowKeyToHllNewAlgorithm;
if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
isUsePutRowKeyToHllNewAlgorithm = false;
- hf = Hashing.murmur3_32();
logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
} else {
isUsePutRowKeyToHllNewAlgorithm = true;
- rowHashCodesLong = new long[nRowKey];
- hf = Hashing.murmur3_128();
logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion());
}
+
+ int calculatorNum = getStatsThreadNum(cuboidIds.length);
+ cuboidStatCalculators = new CuboidStatCalculator[calculatorNum];
+ int splitSize = cuboidIds.length / calculatorNum;
+ if (splitSize <= 0) {
+ splitSize = 1;
+ }
+ for (int i = 0; i < calculatorNum; i++) {
+ HLLCounter[] cuboidsHLLSplit;
+ Integer[][] cuboidsBitSetSplit;
+ Long[] cuboidIdSplit;
+ int start = i * splitSize;
+ if (start > cuboidIds.length) {
+ break;
+ }
+ int end = (i + 1) * splitSize;
+ if (i == calculatorNum - 1) {// last split
+ end = cuboidIds.length;
+ }
+
+ cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end);
+ cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end);
+ cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end);
+ CuboidStatCalculator calculator = new CuboidStatCalculator(i,
+ intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit,
+ isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit);
+ cuboidStatCalculators[i] = calculator;
+ calculator.start();
+ }
}
+ }
+ private int getStatsThreadNum(int cuboidNum) {
+ int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatisticsCalculator();
+ if (unitNum <= 0) {
+ logger.warn("config from getCuboidNumberPerStatisticsCalculator() " + unitNum + " is should larger than 0");
+ logger.info("Will use single thread for cuboid statistics calculation");
+ return 1;
+ }
+
+ int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatisticsCalculatorMaxNumber();
+ int calculatorNum = (cuboidNum - 1) / unitNum + 1;
+ if (calculatorNum > maxCalculatorNum) {
+ calculatorNum = maxCalculatorNum;
+ }
+ return calculatorNum;
}
@Override
@@ -172,11 +213,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
if (collectStatistics) {
if (rowCount % 100 < samplingPercentage) {
- if (isUsePutRowKeyToHllNewAlgorithm) {
- putRowKeyToHLLNew(row);
- } else {
- putRowKeyToHLLOld(row);
- }
+ putRowKeyToHLL(row);
}
if (needFetchPartitionCol == true) {
@@ -200,6 +237,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
}
}
+ private void putRowKeyToHLL(String[] row) {
+ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+ cuboidStatCalculator.putRow(row);
+ }
+ }
+
private long countSizeInBytes(String[] row) {
int size = 0;
for (String s : row) {
@@ -209,80 +252,171 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
return size;
}
- private void putRowKeyToHLLOld(String[] row) {
- //generate hash for each row key column
- byte[][] rowHashCodes = new byte[nRowKey][];
- for (int i = 0; i < nRowKey; i++) {
- Hasher hc = hf.newHasher();
- String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
- if (colValue != null) {
- rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
- } else {
- rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+ @Override
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ if (collectStatistics) {
+ ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ // output each cuboid's hll to reducer, key is 0 - cuboidId
+ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+ cuboidStatCalculator.waitComplete();
+ }
+ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+ Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+ HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+ HLLCounter hll;
+
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = cuboidsHLL[i];
+ tmpbuf.clear();
+ tmpbuf.put(MARK_FOR_HLL); // one byte
+ tmpbuf.putLong(cuboidIds[i]);
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+ sortableKey.init(outputKey, (byte) 0);
+ context.write(sortableKey, outputValue);
+ }
}
}
+ }
- // user the row key column hash to get a consolidated hash for each cuboid
- for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
- Hasher hc = hf.newHasher();
- for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]);
+ private int countNewSize(int oldSize, int dataSize) {
+ int newSize = oldSize * 2;
+ while (newSize < dataSize) {
+ newSize = newSize * 2;
+ }
+ return newSize;
+ }
+
+ public static class CuboidStatCalculator implements Runnable {
+ private final int id;
+ private final int nRowKey;
+ private final int[] rowkeyColIndex;
+ private final Long[] cuboidIds;
+ private final Integer[][] cuboidsBitSet;
+ private volatile HLLCounter[] cuboidsHLL = null;
+
+ //about details of the new algorithm, please see KYLIN-2518
+ private final boolean isNewAlgorithm;
+ private final HashFunction hf;
+ private long[] rowHashCodesLong;
+
+ private BlockingQueue<String[]> queue = new LinkedBlockingQueue<String[]>(2000);
+ private Thread workThread;
+ private volatile boolean stop;
+
+ public CuboidStatCalculator(int id, int[] rowkeyColIndex, Long[] cuboidIds, Integer[][] cuboidsBitSet,
+ boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] cuboidsHLL) {
+ this.id = id;
+ this.nRowKey = rowkeyColIndex.length;
+ this.rowkeyColIndex = rowkeyColIndex;
+ this.cuboidIds = cuboidIds;
+ this.cuboidsBitSet = cuboidsBitSet;
+ this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+ if (!isNewAlgorithm) {
+ this.hf = Hashing.murmur3_32();
+ } else {
+ rowHashCodesLong = new long[nRowKey];
+ this.hf = Hashing.murmur3_128();
}
+ this.cuboidsHLL = cuboidsHLL;
+ workThread = new Thread(this);
+ }
- allCuboidsHLL[i].add(hc.hash().asBytes());
+ public void start() {
+ logger.info("cuboid stats calculator:" + id + " started, handle cuboids number:" + cuboidIds.length);
+ workThread.start();
}
- }
- private void putRowKeyToHLLNew(String[] row) {
- //generate hash for each row key column
- for (int i = 0; i < nRowKey; i++) {
- Hasher hc = hf.newHasher();
- String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
- if (colValue == null)
- colValue = "0";
- byte[] bytes = hc.putString(colValue).hash().asBytes();
- rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+ public void putRow(final String[] row) {
+ String[] copyRow = Arrays.copyOf(row, row.length);
+ try {
+ queue.put(copyRow);
+ } catch (InterruptedException e) {
+ logger.error("interrupt", e);
+ }
}
- // user the row key column hash to get a consolidated hash for each cuboid
- for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
- long value = 0;
- for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- value += rowHashCodesLong[allCuboidsBitSet[i][position]];
+ public void waitComplete() {
+ stop = true;
+ try {
+ workThread.join();
+ } catch (InterruptedException e) {
+ logger.error("interrupt", e);
}
- allCuboidsHLL[i].addHashDirectly(value);
}
- }
- @Override
- protected void doCleanup(Context context) throws IOException, InterruptedException {
- if (collectStatistics) {
- ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
- // output each cuboid's hll to reducer, key is 0 - cuboidId
- HLLCounter hll;
- for (int i = 0; i < cuboidIds.length; i++) {
- hll = allCuboidsHLL[i];
+ private void putRowKeyToHLLOld(String[] row) {
+ //generate hash for each row key column
+ byte[][] rowHashCodes = new byte[nRowKey][];
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[rowkeyColIndex[i]];
+ if (colValue != null) {
+ rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+ } else {
+ rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+ }
+ }
- tmpbuf.clear();
- tmpbuf.put(MARK_FOR_HLL); // one byte
- tmpbuf.putLong(cuboidIds[i]);
- outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- hllBuf.clear();
- hll.writeRegisters(hllBuf);
- outputValue.set(hllBuf.array(), 0, hllBuf.position());
- sortableKey.init(outputKey, (byte) 0);
- context.write(sortableKey, outputValue);
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+ hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+ }
+
+ cuboidsHLL[i].add(hc.hash().asBytes());
}
}
- }
+ private void putRowKeyToHLLNew(String[] row) {
+ //generate hash for each row key column
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[rowkeyColIndex[i]];
+ if (colValue == null)
+ colValue = "0";
+ byte[] bytes = hc.putString(colValue).hash().asBytes();
+ rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+ }
+
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+ long value = 0;
+ for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+ value += rowHashCodesLong[cuboidsBitSet[i][position]];
+ }
+ cuboidsHLL[i].addHashDirectly(value);
+ }
+ }
- private int countNewSize(int oldSize, int dataSize) {
- int newSize = oldSize * 2;
- while (newSize < dataSize) {
- newSize = newSize * 2;
+ public HLLCounter[] getHLLCounters() {
+ return cuboidsHLL;
}
- return newSize;
- }
+ public Long[] getCuboidIds() {
+ return cuboidIds;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ String[] row = queue.poll();
+ if (row == null && stop) {
+ logger.info("cuboid stats calculator:" + id + " completed.");
+ break;
+ } else if (row == null) {
+ Thread.yield();
+ continue;
+ }
+ if (isNewAlgorithm) {
+ putRowKeyToHLLNew(row);
+ } else {
+ putRowKeyToHLLOld(row);
+ }
+ }
+ }
+ }
}
[3/3] kylin git commit: KYLIN-2736 code refine
Posted by li...@apache.org.
KYLIN-2736 code refine
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/79374047
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/79374047
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/79374047
Branch: refs/heads/master
Commit: 79374047d3bcd8468bee9ac56f68f4f191edad7b
Parents: fd59b51
Author: lidongsjtu <li...@apache.org>
Authored: Wed Dec 13 16:44:44 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 13 16:44:44 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 6 ++--
.../mr/steps/FactDistinctColumnsMapper.java | 30 +++++++++++---------
2 files changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/79374047/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4ccf1d1..d59c98d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1027,12 +1027,12 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
}
- public int getCuboidStatisticsCalculatorMaxNumber() {
- // default multi-thread statistics calculation is disabled
+ public int getCuboidStatsCalculatorMaxNumber() {
+ // set 1 to disable multi-thread statistics calculation
return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
}
- public int getCuboidNumberPerStatisticsCalculator() {
+ public int getCuboidNumberPerStatsCalculator() {
return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/79374047/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 272894f..575c7b3 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -46,8 +46,6 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
-
-
/**
*/
public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
@@ -84,7 +82,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
tmpbuf = ByteBuffer.allocate(4096);
collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
if (collectStatistics) {
- samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+ samplingPercentage = Integer
+ .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
Set<Long> cuboidIdSet = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
@@ -101,7 +100,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
}
-
TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
if (partitionColRef != null) {
partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
@@ -121,7 +119,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
} else {
isUsePutRowKeyToHllNewAlgorithm = true;
- logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion());
+ logger.info(
+ "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+ cubeDesc.getVersion());
}
int calculatorNum = getStatsThreadNum(cuboidIds.length);
@@ -135,7 +135,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
Integer[][] cuboidsBitSetSplit;
Long[] cuboidIdSplit;
int start = i * splitSize;
- if (start > cuboidIds.length) {
+ if (start >= cuboidIds.length) {
break;
}
int end = (i + 1) * splitSize;
@@ -156,14 +156,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
}
private int getStatsThreadNum(int cuboidNum) {
- int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatisticsCalculator();
+ int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatsCalculator();
if (unitNum <= 0) {
- logger.warn("config from getCuboidNumberPerStatisticsCalculator() " + unitNum + " is should larger than 0");
+ logger.warn("config from getCuboidNumberPerStatsCalculator() " + unitNum + " is should larger than 0");
logger.info("Will use single thread for cuboid statistics calculation");
return 1;
}
- int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatisticsCalculatorMaxNumber();
+ int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatsCalculatorMaxNumber();
int calculatorNum = (cuboidNum - 1) / unitNum + 1;
if (calculatorNum > maxCalculatorNum) {
calculatorNum = maxCalculatorNum;
@@ -175,7 +175,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
- for (String[] row: rowCollection) {
+ for (String[] row : rowCollection) {
context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
for (int i = 0; i < dictCols.size(); i++) {
String fieldValue = row[dictionaryColumnIndex[i]];
@@ -188,7 +188,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
reducerIndex = columnIndexToReducerBeginId.get(i);
} else {
//for the uhc
- reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+ reducerIndex = columnIndexToReducerBeginId.get(i)
+ + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
}
tmpbuf.clear();
@@ -207,7 +208,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
// log a few rows for troubleshooting
if (rowCount < 10) {
- logger.info("Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+ logger.info(
+ "Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
}
}
@@ -258,7 +260,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
// output each cuboid's hll to reducer, key is 0 - cuboidId
for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
- cuboidStatCalculator.waitComplete();
+ cuboidStatCalculator.waitForCompletion();
}
for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
@@ -338,7 +340,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
}
}
- public void waitComplete() {
+ public void waitForCompletion() {
stop = true;
try {
workThread.join();