You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/04 04:43:37 UTC
[08/47] kylin git commit: KYLIN-2409 change inmem cubing to single
thread by default
KYLIN-2409 change inmem cubing to single thread by default
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/21969753
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/21969753
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/21969753
Branch: refs/heads/KYLIN-2361
Commit: 21969753cc66efa22bc9fc933af46346e9846631
Parents: b49c9e3
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jan 19 11:52:17 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Jan 19 17:37:42 2017 +0800
----------------------------------------------------------------------
.../main/java/org/apache/kylin/common/KylinConfigBase.java | 6 +++++-
.../kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java | 2 +-
.../java/org/apache/kylin/gridtable/GTAggregateScanner.java | 9 +++++++++
.../org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java | 2 ++
4 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d6774ff..74903d5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -300,6 +300,10 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.cube.algorithm.inmem-split-limit", "500"));
}
+ public int getCubeAlgorithmInMemConcurrentThreads() {
+ return Integer.parseInt(getOptional("kylin.cube.algorithm.inmem-concurrent-threads", "1"));
+ }
+
public boolean isIgnoreCubeSignatureInconsistency() {
return Boolean.parseBoolean(getOptional("kylin.cube.ignore-signature-inconsistency", "false"));
}
@@ -744,7 +748,7 @@ abstract public class KylinConfigBase implements Serializable {
}
public int getYarnStatusCheckIntervalSeconds() {
- return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "60"));
+ return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "10"));
}
// ============================================================================
http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index 651203a..c7a4a05 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -46,7 +46,7 @@ abstract public class AbstractInMemCubeBuilder {
final protected CubeDesc cubeDesc;
final protected Map<TblColRef, Dictionary<String>> dictionaryMap;
- protected int taskThreadCount = 4;
+ protected int taskThreadCount = 1;
protected int reserveMemoryMB = 100;
public AbstractInMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 55c04c6..9158aa3 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.Pair;
@@ -178,11 +179,16 @@ public class GTAggregateScanner implements IGTScanner {
final List<Dump> dumps;
final int keyLength;
final boolean[] compareMask;
+ boolean compareAll = true;
final BufferedMeasureCodec measureCodec;
final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
+ if (compareAll) {
+ return Bytes.compareTo(o1, o2);
+ }
+
int result = 0;
// profiler shows this check is slow
// Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
@@ -206,6 +212,9 @@ public class GTAggregateScanner implements IGTScanner {
public AggregationCache() {
compareMask = createCompareMask();
+ for (boolean l : compareMask) {
+ compareAll = compareAll && l;
+ }
keyLength = compareMask.length;
dumps = Lists.newArrayList();
aggBufMap = createBuffMap();
http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 116d5e0..c0ff2f2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -94,8 +94,10 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
dictionaryMap.put(col, cubeSegment.getDictionary(col));
}
+ int taskCount = config.getCubeAlgorithmInMemConcurrentThreads();
DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration()));
+ cubeBuilder.setConcurrentThreads(taskCount);
ExecutorService executorService = Executors.newSingleThreadExecutor();
future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));