You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/13 08:47:08 UTC
[2/3] kylin git commit: APACHE-KYLIN-2736: Use multiple threads to
calculate HyperLogLogPlusCounter in FactDistinctColumnsMapper
APACHE-KYLIN-2736: Use multiple threads to calculate HyperLogLogPlusCounter in FactDistinctColumnsMapper
Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fd59b513
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fd59b513
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fd59b513
Branch: refs/heads/master
Commit: fd59b513c7118e8c4e273ee2ed8b1be467490cc7
Parents: cb7f567
Author: Ma Gang <mg...@163.com>
Authored: Tue Sep 12 18:12:16 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Dec 13 15:48:17 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 9 +
.../mr/steps/FactDistinctColumnsMapper.java | 278 ++++++++++++++-----
2 files changed, 215 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd59b513/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 914ba7e..4ccf1d1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1027,6 +1027,15 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
}
+ public int getCuboidStatisticsCalculatorMaxNumber() {
+ // default multi-thread statistics calculation is disabled
+ return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
+ }
+
+ public int getCuboidNumberPerStatisticsCalculator() {
+ return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
+ }
+
//UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
public int getUHCReducerCount() {
return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5"));
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd59b513/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index ace16a5..272894f 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -20,8 +20,11 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinVersion;
@@ -55,18 +58,17 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
BYTES
}
-
protected boolean collectStatistics = false;
protected int nRowKey;
private Integer[][] allCuboidsBitSet = null;
private HLLCounter[] allCuboidsHLL = null;
private Long[] cuboidIds;
- private HashFunction hf = null;
private int rowCount = 0;
private int samplingPercentage;
- //private ByteArray[] row_hashcodes = null;
- private long[] rowHashCodesLong = null;
private ByteBuffer tmpbuf;
+
+ private CuboidStatCalculator[] cuboidStatCalculators;
+
private static final Text EMPTY_TEXT = new Text();
public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
public static final byte MARK_FOR_HLL = (byte) 0xFF;
@@ -76,9 +78,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
- //about details of the new algorithm, please see KYLIN-2518
- private boolean isUsePutRowKeyToHllNewAlgorithm;
-
@Override
protected void doSetup(Context context) throws IOException {
super.doSetup(context);
@@ -116,18 +115,60 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
needFetchPartitionCol = true;
}
//for KYLIN-2518 backward compatibility
+ boolean isUsePutRowKeyToHllNewAlgorithm;
if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
isUsePutRowKeyToHllNewAlgorithm = false;
- hf = Hashing.murmur3_32();
logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
} else {
isUsePutRowKeyToHllNewAlgorithm = true;
- rowHashCodesLong = new long[nRowKey];
- hf = Hashing.murmur3_128();
logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion());
}
+
+ int calculatorNum = getStatsThreadNum(cuboidIds.length);
+ cuboidStatCalculators = new CuboidStatCalculator[calculatorNum];
+ int splitSize = cuboidIds.length / calculatorNum;
+ if (splitSize <= 0) {
+ splitSize = 1;
+ }
+ for (int i = 0; i < calculatorNum; i++) {
+ HLLCounter[] cuboidsHLLSplit;
+ Integer[][] cuboidsBitSetSplit;
+ Long[] cuboidIdSplit;
+ int start = i * splitSize;
+ if (start > cuboidIds.length) {
+ break;
+ }
+ int end = (i + 1) * splitSize;
+ if (i == calculatorNum - 1) {// last split
+ end = cuboidIds.length;
+ }
+
+ cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end);
+ cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end);
+ cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end);
+ CuboidStatCalculator calculator = new CuboidStatCalculator(i,
+ intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit,
+ isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit);
+ cuboidStatCalculators[i] = calculator;
+ calculator.start();
+ }
}
+ }
+ private int getStatsThreadNum(int cuboidNum) {
+ int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatisticsCalculator();
+ if (unitNum <= 0) {
+ logger.warn("config from getCuboidNumberPerStatisticsCalculator() " + unitNum + " is should larger than 0");
+ logger.info("Will use single thread for cuboid statistics calculation");
+ return 1;
+ }
+
+ int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatisticsCalculatorMaxNumber();
+ int calculatorNum = (cuboidNum - 1) / unitNum + 1;
+ if (calculatorNum > maxCalculatorNum) {
+ calculatorNum = maxCalculatorNum;
+ }
+ return calculatorNum;
}
@Override
@@ -172,11 +213,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
if (collectStatistics) {
if (rowCount % 100 < samplingPercentage) {
- if (isUsePutRowKeyToHllNewAlgorithm) {
- putRowKeyToHLLNew(row);
- } else {
- putRowKeyToHLLOld(row);
- }
+ putRowKeyToHLL(row);
}
if (needFetchPartitionCol == true) {
@@ -200,6 +237,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
}
}
+ private void putRowKeyToHLL(String[] row) {
+ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+ cuboidStatCalculator.putRow(row);
+ }
+ }
+
private long countSizeInBytes(String[] row) {
int size = 0;
for (String s : row) {
@@ -209,80 +252,171 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
return size;
}
- private void putRowKeyToHLLOld(String[] row) {
- //generate hash for each row key column
- byte[][] rowHashCodes = new byte[nRowKey][];
- for (int i = 0; i < nRowKey; i++) {
- Hasher hc = hf.newHasher();
- String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
- if (colValue != null) {
- rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
- } else {
- rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+ @Override
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ if (collectStatistics) {
+ ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ // output each cuboid's hll to reducer, key is 0 - cuboidId
+ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+ cuboidStatCalculator.waitComplete();
+ }
+ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+ Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+ HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+ HLLCounter hll;
+
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = cuboidsHLL[i];
+ tmpbuf.clear();
+ tmpbuf.put(MARK_FOR_HLL); // one byte
+ tmpbuf.putLong(cuboidIds[i]);
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+ sortableKey.init(outputKey, (byte) 0);
+ context.write(sortableKey, outputValue);
+ }
}
}
+ }
- // user the row key column hash to get a consolidated hash for each cuboid
- for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
- Hasher hc = hf.newHasher();
- for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]);
+ private int countNewSize(int oldSize, int dataSize) {
+ int newSize = oldSize * 2;
+ while (newSize < dataSize) {
+ newSize = newSize * 2;
+ }
+ return newSize;
+ }
+
+ public static class CuboidStatCalculator implements Runnable {
+ private final int id;
+ private final int nRowKey;
+ private final int[] rowkeyColIndex;
+ private final Long[] cuboidIds;
+ private final Integer[][] cuboidsBitSet;
+ private volatile HLLCounter[] cuboidsHLL = null;
+
+ //about details of the new algorithm, please see KYLIN-2518
+ private final boolean isNewAlgorithm;
+ private final HashFunction hf;
+ private long[] rowHashCodesLong;
+
+ private BlockingQueue<String[]> queue = new LinkedBlockingQueue<String[]>(2000);
+ private Thread workThread;
+ private volatile boolean stop;
+
+ public CuboidStatCalculator(int id, int[] rowkeyColIndex, Long[] cuboidIds, Integer[][] cuboidsBitSet,
+ boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] cuboidsHLL) {
+ this.id = id;
+ this.nRowKey = rowkeyColIndex.length;
+ this.rowkeyColIndex = rowkeyColIndex;
+ this.cuboidIds = cuboidIds;
+ this.cuboidsBitSet = cuboidsBitSet;
+ this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+ if (!isNewAlgorithm) {
+ this.hf = Hashing.murmur3_32();
+ } else {
+ rowHashCodesLong = new long[nRowKey];
+ this.hf = Hashing.murmur3_128();
}
+ this.cuboidsHLL = cuboidsHLL;
+ workThread = new Thread(this);
+ }
- allCuboidsHLL[i].add(hc.hash().asBytes());
+ public void start() {
+ logger.info("cuboid stats calculator:" + id + " started, handle cuboids number:" + cuboidIds.length);
+ workThread.start();
}
- }
- private void putRowKeyToHLLNew(String[] row) {
- //generate hash for each row key column
- for (int i = 0; i < nRowKey; i++) {
- Hasher hc = hf.newHasher();
- String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
- if (colValue == null)
- colValue = "0";
- byte[] bytes = hc.putString(colValue).hash().asBytes();
- rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+ public void putRow(final String[] row) {
+ String[] copyRow = Arrays.copyOf(row, row.length);
+ try {
+ queue.put(copyRow);
+ } catch (InterruptedException e) {
+ logger.error("interrupt", e);
+ }
}
- // user the row key column hash to get a consolidated hash for each cuboid
- for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
- long value = 0;
- for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- value += rowHashCodesLong[allCuboidsBitSet[i][position]];
+ public void waitComplete() {
+ stop = true;
+ try {
+ workThread.join();
+ } catch (InterruptedException e) {
+ logger.error("interrupt", e);
}
- allCuboidsHLL[i].addHashDirectly(value);
}
- }
- @Override
- protected void doCleanup(Context context) throws IOException, InterruptedException {
- if (collectStatistics) {
- ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
- // output each cuboid's hll to reducer, key is 0 - cuboidId
- HLLCounter hll;
- for (int i = 0; i < cuboidIds.length; i++) {
- hll = allCuboidsHLL[i];
+ private void putRowKeyToHLLOld(String[] row) {
+ //generate hash for each row key column
+ byte[][] rowHashCodes = new byte[nRowKey][];
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[rowkeyColIndex[i]];
+ if (colValue != null) {
+ rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+ } else {
+ rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+ }
+ }
- tmpbuf.clear();
- tmpbuf.put(MARK_FOR_HLL); // one byte
- tmpbuf.putLong(cuboidIds[i]);
- outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- hllBuf.clear();
- hll.writeRegisters(hllBuf);
- outputValue.set(hllBuf.array(), 0, hllBuf.position());
- sortableKey.init(outputKey, (byte) 0);
- context.write(sortableKey, outputValue);
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+ hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+ }
+
+ cuboidsHLL[i].add(hc.hash().asBytes());
}
}
- }
+ private void putRowKeyToHLLNew(String[] row) {
+ //generate hash for each row key column
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[rowkeyColIndex[i]];
+ if (colValue == null)
+ colValue = "0";
+ byte[] bytes = hc.putString(colValue).hash().asBytes();
+ rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+ }
+
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+ long value = 0;
+ for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+ value += rowHashCodesLong[cuboidsBitSet[i][position]];
+ }
+ cuboidsHLL[i].addHashDirectly(value);
+ }
+ }
- private int countNewSize(int oldSize, int dataSize) {
- int newSize = oldSize * 2;
- while (newSize < dataSize) {
- newSize = newSize * 2;
+ public HLLCounter[] getHLLCounters() {
+ return cuboidsHLL;
}
- return newSize;
- }
+ public Long[] getCuboidIds() {
+ return cuboidIds;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ String[] row = queue.poll();
+ if (row == null && stop) {
+ logger.info("cuboid stats calculator:" + id + " completed.");
+ break;
+ } else if (row == null) {
+ Thread.yield();
+ continue;
+ }
+ if (isNewAlgorithm) {
+ putRowKeyToHLLNew(row);
+ } else {
+ putRowKeyToHLLOld(row);
+ }
+ }
+ }
+ }
}