You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/19 09:38:42 UTC

[1/2] kylin git commit: KYLIN-2409 change inmem cubing to single thread by default

Repository: kylin
Updated Branches:
  refs/heads/master 38c3e7bf6 -> 21969753c


KYLIN-2409 change inmem cubing to single thread by default


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/21969753
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/21969753
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/21969753

Branch: refs/heads/master
Commit: 21969753cc66efa22bc9fc933af46346e9846631
Parents: b49c9e3
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jan 19 11:52:17 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Jan 19 17:37:42 2017 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/kylin/common/KylinConfigBase.java  | 6 +++++-
 .../kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java    | 2 +-
 .../java/org/apache/kylin/gridtable/GTAggregateScanner.java | 9 +++++++++
 .../org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java | 2 ++
 4 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d6774ff..74903d5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -300,6 +300,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.cube.algorithm.inmem-split-limit", "500"));
     }
 
+    public int getCubeAlgorithmInMemConcurrentThreads() {
+        return Integer.parseInt(getOptional("kylin.cube.algorithm.inmem-concurrent-threads", "1"));
+    }
+
     public boolean isIgnoreCubeSignatureInconsistency() {
         return Boolean.parseBoolean(getOptional("kylin.cube.ignore-signature-inconsistency", "false"));
     }
@@ -744,7 +748,7 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     public int getYarnStatusCheckIntervalSeconds() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "60"));
+        return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "10"));
     }
 
     // ============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index 651203a..c7a4a05 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -46,7 +46,7 @@ abstract public class AbstractInMemCubeBuilder {
     final protected CubeDesc cubeDesc;
     final protected Map<TblColRef, Dictionary<String>> dictionaryMap;
 
-    protected int taskThreadCount = 4;
+    protected int taskThreadCount = 1;
     protected int reserveMemoryMB = 100;
 
     public AbstractInMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 55c04c6..9158aa3 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.Pair;
@@ -178,11 +179,16 @@ public class GTAggregateScanner implements IGTScanner {
         final List<Dump> dumps;
         final int keyLength;
         final boolean[] compareMask;
+        boolean compareAll = true;
         final BufferedMeasureCodec measureCodec;
 
         final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
             @Override
             public int compare(byte[] o1, byte[] o2) {
+                if (compareAll) {
+                    return Bytes.compareTo(o1, o2);
+                }
+
                 int result = 0;
                 // profiler shows this check is slow
                 // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
@@ -206,6 +212,9 @@ public class GTAggregateScanner implements IGTScanner {
 
         public AggregationCache() {
             compareMask = createCompareMask();
+            for (boolean l : compareMask) {
+                compareAll = compareAll && l;
+            }
             keyLength = compareMask.length;
             dumps = Lists.newArrayList();
             aggBufMap = createBuffMap();

http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 116d5e0..c0ff2f2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -94,8 +94,10 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
             dictionaryMap.put(col, cubeSegment.getDictionary(col));
         }
 
+        int taskCount = config.getCubeAlgorithmInMemConcurrentThreads();
         DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
         cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration()));
+        cubeBuilder.setConcurrentThreads(taskCount);
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));


[2/2] kylin git commit: KYLIN-2411 Kill MR job on pause

Posted by sh...@apache.org.
KYLIN-2411 Kill MR job on pause


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b49c9e39
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b49c9e39
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b49c9e39

Branch: refs/heads/master
Commit: b49c9e3951c29a21e71f179ee77e2295fbc52ba9
Parents: 38c3e7b
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jan 19 11:02:12 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Jan 19 17:37:42 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/execution/ExecutableManager.java    | 2 +-
 .../apache/kylin/engine/mr/common/MapReduceExecutable.java   | 8 ++++++--
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b49c9e39/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 466cdad..48cedb5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -347,7 +347,7 @@ public class ExecutableManager {
             for (AbstractExecutable task : tasks) {
                 if (task.getId().compareTo(stepId) >= 0) {
                     logger.debug("rollback task : " + task);
-                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+                    updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "");
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b49c9e39/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index f887c4c..6de07ca 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -148,7 +148,7 @@ public class MapReduceExecutable extends AbstractExecutable {
             //            boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos();
             //            HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth);
             JobStepStatusEnum status = JobStepStatusEnum.NEW;
-            while (!isDiscarded()) {
+            while (!isDiscarded() && !isPaused()) {
 
                 JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output);
                 if (status == JobStepStatusEnum.KILLED) {
@@ -184,7 +184,11 @@ public class MapReduceExecutable extends AbstractExecutable {
                 }
             }
 
-            return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
+            if (isDiscarded()) {
+                return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
+            } else {
+                return new ExecuteResult(ExecuteResult.State.STOPPED, output.toString());
+            }
 
         } catch (ReflectiveOperationException e) {
             logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);