You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/10/13 03:35:51 UTC

incubator-kylin git commit: minor, various MR parameter tunings: reducer number and its default cap, base cuboid size estimate, and more logs

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 155ca3263 -> 672bf421e


minor, various MR parameter tunings: reducer number and its default cap, base cuboid size estimate, and more logs


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/672bf421
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/672bf421
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/672bf421

Branch: refs/heads/2.x-staging
Commit: 672bf421e1689f1f856a0e070c409b07b4ce4935
Parents: 155ca32
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Oct 13 09:35:06 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Oct 13 09:35:06 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  2 +-
 .../inmemcubing/AbstractInMemCubeBuilder.java   |  6 +++-
 .../cube/inmemcubing/DoggedCubeBuilder.java     | 29 ++++++++++++++++++--
 .../cube/inmemcubing/InMemCubeBuilder.java      |  3 +-
 .../engine/mr/steps/InMemCuboidMapper.java      |  6 +++-
 .../hbase/steps/HBaseMROutput2Transition.java   | 10 ++++---
 6 files changed, 46 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/672bf421/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 376327a..cd19f4c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -385,7 +385,7 @@ public class KylinConfig implements Serializable {
     }
 
     public int getHadoopJobMaxReducerNumber() {
-        return Integer.parseInt(getOptional(KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER, "5000"));
+        return Integer.parseInt(getOptional(KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER, "500"));
     }
 
     public boolean getRunAsRemoteCommand() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/672bf421/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index 7947b28..0a35559 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -42,7 +42,7 @@ abstract public class AbstractInMemCubeBuilder {
     final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
 
     protected int taskThreadCount = 4;
-    protected int reserveMemoryMB = 100;
+    protected int reserveMemoryMB = 200;
 
     public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
         if (cubeDesc == null)
@@ -61,6 +61,10 @@ abstract public class AbstractInMemCubeBuilder {
     public void setReserveMemoryMB(int mb) {
         this.reserveMemoryMB = mb;
     }
+    
+    public int getReserveMemoryMB() {
+        return this.reserveMemoryMB;
+    }
 
     public Runnable buildAsRunnable(final BlockingQueue<List<String>> input, final ICuboidWriter output) {
         return new Runnable() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/672bf421/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index c05bfaf..7fe2122 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -69,7 +69,22 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private class BuildOnce {
-        
+        private int cutAheadMB;
+
+        BuildOnce() {
+            int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
+
+            // InMemCubeBuilder will over-estimate base cuboid size by a factor, must cut ahead at least the same factor
+            cutAheadMB = (int) (systemAvailMB * InMemCubeBuilder.BASE_CUBOID_CACHE_OVERSIZE_FACTOR);
+            logger.info("Cut ahead MB is " + cutAheadMB);
+
+            int half = systemAvailMB / 2;
+            if (getReserveMemoryMB() > half) {
+                logger.info("Reserve " + getReserveMemoryMB() + " MB is more than half of system avail " + systemAvailMB + " MB, override to " + half);
+                setReserveMemoryMB(half);
+            }
+        }
+
         public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
             final List<SplitThread> splits = new ArrayList<SplitThread>();
             final Merger merger = new Merger();
@@ -244,7 +259,17 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
 
             logger.debug(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold");
 
-            return splitRowCount >= splitRowThreshold || systemAvailMB <= reserveMemoryMB * 1.5;
+            if (splitRowCount >= splitRowThreshold) {
+                logger.info("Split cut due to hitting splitRowThreshold " + splitRowThreshold);
+                return true;
+            }
+
+            if (systemAvailMB <= reserveMemoryMB + cutAheadMB) {
+                logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + " MB + cut ahead " + cutAheadMB + " MB");
+                return true;
+            }
+
+            return false;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/672bf421/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index bc01caf..a58f1ef 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -68,6 +68,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
     private static final LongMutable ONE = new LongMutable(1l);
+    static final double BASE_CUBOID_CACHE_OVERSIZE_FACTOR = 0.1;
 
     private final CuboidScheduler cuboidScheduler;
     private final long baseCuboidId;
@@ -391,7 +392,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
         int mbBaseAggrCacheOnHeap = mbAfter == 0 ? 0 : mbBefore - mbAfter;
         int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
-        int mbBaseAggrCache = mbBaseAggrCacheOnHeap;
+        int mbBaseAggrCache = (int) (mbBaseAggrCacheOnHeap * (1 + BASE_CUBOID_CACHE_OVERSIZE_FACTOR));
         mbBaseAggrCache = Math.max(mbBaseAggrCache, 10); // let it be at least 10 MB
         logger.info("Base aggr cache is " + mbBaseAggrCache + " MB (heap " + mbBaseAggrCacheOnHeap + " MB, estimate " + mbEstimateBaseAggrCache + " MB)");
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/672bf421/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index e69b8a3..2bf627b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -87,8 +87,12 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
             }
         }
         
-
         DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        // Some may want to left out memory for "mapreduce.task.io.sort.mb", but that is not
+        // necessary, because the output phase is after all in-mem cubing is done, and at that
+        // time all memory has been released, cuboid data is read from ConcurrentDiskStore.
+        //cubeBuilder.setReserveMemoryMB(mapreduce.task.io.sort.mb);
+        
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/672bf421/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 6ccb60b..10f4661 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -38,13 +38,14 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
@@ -365,9 +366,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             String htableName = seg.getStorageLocationIdentifier();
             Configuration conf = HBaseConfiguration.create(job.getConfiguration());
             HTable htable = new HTable(conf, htableName);
-            
             int regions = htable.getStartKeys().length + 1;
-            int reducerNum = regions * 10;
+            htable.close();
+            
+            int reducerNum = regions * 3;
             KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             reducerNum = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), reducerNum);