You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/02 17:24:01 UTC

[08/19] kylin git commit: APACHE-KYLIN-2732: Introduce base cuboid as a new input for cubing job

APACHE-KYLIN-2732: Introduce base cuboid as a new input for cubing job

Signed-off-by: Zhong <nj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bdf0f69c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bdf0f69c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bdf0f69c

Branch: refs/heads/master
Commit: bdf0f69c96e4cf21289168c49b778c741e06ae83
Parents: a94b479
Author: Wang Ken <mi...@ebay.com>
Authored: Mon Aug 28 11:34:17 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Sat Dec 2 23:21:43 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeInstance.java     |  33 +++
 .../cube/cuboid/TreeCuboidSchedulerManager.java | 102 +++++++++
 .../inmemcubing/AbstractInMemCubeBuilder.java   |  13 +-
 .../ConsumeBlockingQueueController.java         |  84 ++++++++
 .../cube/inmemcubing/DoggedCubeBuilder.java     | 117 ++--------
 .../cube/inmemcubing/InMemCubeBuilder.java      |  87 ++------
 .../InMemCubeBuilderInputConverter.java         | 149 -------------
 .../kylin/cube/inmemcubing/InputConverter.java  |  69 ++++++
 .../cube/inmemcubing/InputConverterUnit.java    |  33 +++
 .../InputConverterUnitForBaseCuboid.java        |  49 +++++
 .../InputConverterUnitForRawData.java           | 159 ++++++++++++++
 .../RecordConsumeBlockingQueueController.java   |  91 ++++++++
 .../org/apache/kylin/gridtable/GTRecord.java    |   5 +
 .../engine/mr/common/AbstractHadoopJob.java     |  12 +-
 .../kylin/engine/mr/common/BatchConstants.java  |   1 +
 .../engine/mr/steps/InMemCuboidMapper.java      | 129 +++--------
 .../engine/mr/steps/InMemCuboidMapperBase.java  | 216 +++++++++++++++++++
 .../ITDoggedCubeBuilderStressTest.java          |   3 +-
 .../inmemcubing/ITDoggedCubeBuilderTest.java    |   3 +-
 .../inmemcubing/ITInMemCubeBuilderTest.java     |  14 +-
 20 files changed, 944 insertions(+), 425 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index cc56727..f6eceb6 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.cube;
 
+import static org.apache.kylin.cube.cuboid.CuboidModeEnum.CURRENT;
+import static org.apache.kylin.cube.cuboid.CuboidModeEnum.RECOMMEND;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +32,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -327,6 +331,35 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         this.createTimeUTC = createTimeUTC;
     }
 
+    public Set<Long> getCuboidsByMode(String cuboidModeName) {
+        return getCuboidsByMode(cuboidModeName == null ? null : CuboidModeEnum.getByModeName(cuboidModeName));
+    }
+
+    public Set<Long> getCuboidsByMode(CuboidModeEnum cuboidMode) {
+        if (cuboidMode == null || cuboidMode == CURRENT) {
+            return getCuboidScheduler().getAllCuboidIds();
+        }
+        Set<Long> cuboidsRecommend = getCuboidsRecommend();
+        if (cuboidsRecommend == null || cuboidMode == RECOMMEND) {
+            return cuboidsRecommend;
+        }
+        Set<Long> currentCuboids = getCuboidScheduler().getAllCuboidIds();
+        switch (cuboidMode) {
+        case RECOMMEND_EXISTING:
+            cuboidsRecommend.retainAll(currentCuboids);
+            return cuboidsRecommend;
+        case RECOMMEND_MISSING:
+            cuboidsRecommend.removeAll(currentCuboids);
+            return cuboidsRecommend;
+        case RECOMMEND_MISSING_WITH_BASE:
+            cuboidsRecommend.removeAll(currentCuboids);
+            currentCuboids.add(getCuboidScheduler().getBaseCuboidId());
+            return cuboidsRecommend;
+        default:
+            return null;
+        }
+    }
+
     public Map<Long, Long> getCuboids() {
         if (cuboidBytes == null)
             return null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
new file mode 100644
index 0000000..5e8d965
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TreeCuboidSchedulerManager {
+    private static ConcurrentMap<String, TreeCuboidScheduler> cache = Maps.newConcurrentMap();
+
+    private class TreeCuboidSchedulerSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            cache.clear();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey)
+                throws IOException {
+            cache.remove(cacheKey);
+        }
+    }
+
+    public TreeCuboidSchedulerManager() {
+        Broadcaster.getInstance(KylinConfig.getInstanceFromEnv())
+                .registerListener(new TreeCuboidSchedulerSyncListener(), "cube");
+    }
+
+    private static TreeCuboidSchedulerManager instance = new TreeCuboidSchedulerManager();
+
+    public static TreeCuboidSchedulerManager getInstance() {
+        return instance;
+    }
+
+    /**
+     *
+     * @param cubeName
+     * @return null if the cube has no pre-built cuboids
+     */
+    public static TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) {
+        TreeCuboidScheduler result = cache.get(cubeName);
+        if (result == null) {
+            CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+            if (cubeInstance == null) {
+                return null;
+            }
+            TreeCuboidScheduler treeCuboidScheduler = getTreeCuboidScheduler(cubeInstance.getDescriptor(),
+                    cubeManager.getCube(cubeName).getCuboids());
+            if (treeCuboidScheduler == null) {
+                return null;
+            }
+            cache.put(cubeName, treeCuboidScheduler);
+            result = treeCuboidScheduler;
+        }
+        return result;
+    }
+
+    public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) {
+        if (cuboidsWithRowCnt == null || cuboidsWithRowCnt.isEmpty()) {
+            return null;
+        }
+        return getTreeCuboidScheduler(cubeDesc, Lists.newArrayList(cuboidsWithRowCnt.keySet()), cuboidsWithRowCnt);
+    }
+
+    public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds,
+            Map<Long, Long> cuboidsWithRowCnt) {
+        if (cuboidIds == null || cuboidsWithRowCnt == null) {
+            return null;
+        }
+        TreeCuboidScheduler treeCuboidScheduler = new TreeCuboidScheduler(cubeDesc, cuboidIds,
+                new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt));
+        return treeCuboidScheduler;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index 952926c..df1fa7a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.cube.inmemcubing;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
@@ -78,12 +77,17 @@ abstract public class AbstractInMemCubeBuilder {
         return this.reserveMemoryMB;
     }
 
-    public Runnable buildAsRunnable(final BlockingQueue<List<String>> input, final ICuboidWriter output) {
+    public Runnable buildAsRunnable(final BlockingQueue<String[]> input, final ICuboidWriter output) {
+        return buildAsRunnable(input, new InputConverterUnitForRawData(cubeDesc, flatDesc, dictionaryMap), output);
+    }
+
+    public <T> Runnable buildAsRunnable(final BlockingQueue<T> input, final InputConverterUnit<T> inputConverterUnit,
+            final ICuboidWriter output) {
         return new Runnable() {
             @Override
             public void run() {
                 try {
-                    build(input, output);
+                    build(input, inputConverterUnit, output);
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -91,7 +95,8 @@ abstract public class AbstractInMemCubeBuilder {
         };
     }
 
-    abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException;
+    abstract public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit,
+            ICuboidWriter output) throws IOException;
 
     protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
         long startTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
new file mode 100644
index 0000000..a9e55f7
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+
+public class ConsumeBlockingQueueController<T> implements Iterator<T> {
+    public final static int DEFAULT_BATCH_SIZE = 1000;
+
+    private volatile boolean hasException = false;
+    private final BlockingQueue<T> input;
+    private final int batchSize;
+    private final List<T> batchBuffer;
+    private Iterator<T> internalIT;
+
+    private AtomicInteger outputRowCount = new AtomicInteger();
+
+    public ConsumeBlockingQueueController(BlockingQueue<T> input, int batchSize) {
+        this.input = input;
+        this.batchSize = batchSize;
+        this.batchBuffer = Lists.newArrayListWithExpectedSize(batchSize);
+        this.internalIT = batchBuffer.iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (hasException) {
+            return false;
+        }
+        if (internalIT.hasNext()) {
+            return true;
+        } else {
+            batchBuffer.clear();
+            try {
+                batchBuffer.add(input.take());
+                outputRowCount.incrementAndGet();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            outputRowCount.addAndGet(input.drainTo(batchBuffer, batchSize - 1));
+            internalIT = batchBuffer.iterator();
+        }
+        return true;
+    }
+
+    @Override
+    public T next() {
+        return internalIT.next();
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    public void findException() {
+        hasException = true;
+    }
+
+    public long getOutputRowCount() {
+        return outputRowCount.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index dd92a2b..ccd7137 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -20,21 +20,17 @@ package org.apache.kylin.cube.inmemcubing;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequestBuilder;
@@ -55,7 +51,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
     private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
 
     private int splitRowThreshold = Integer.MAX_VALUE;
-    private int unitRows = 1000;
+    private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE;
 
     public DoggedCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
             Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -72,8 +68,9 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     @Override
-    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
-        new BuildOnce().build(input, output);
+    public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+            throws IOException {
+        new BuildOnce().build(input, inputConverterUnit, output);
     }
 
     private class BuildOnce {
@@ -81,7 +78,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
         BuildOnce() {
         }
 
-        public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+        public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+                throws IOException {
             final List<SplitThread> splits = new ArrayList<SplitThread>();
             final Merger merger = new Merger();
 
@@ -89,32 +87,23 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
             logger.info("Dogged Cube Build start");
 
             try {
-                SplitThread last = null;
-                boolean eof = false;
+                while (true) {
+                    SplitThread last = new SplitThread(splits.size() + 1, RecordConsumeBlockingQueueController
+                            .getQueueController(inputConverterUnit, input, unitRows));
+                    splits.add(last);
 
-                while (!eof) {
+                    last.start();
+                    logger.info("Split #" + splits.size() + " kickoff");
 
-                    if (last != null && shouldCutSplit(splits)) {
-                        cutSplit(last);
-                        last = null;
-                    }
+                    // Build splits sequentially
+                    last.join();
 
                     checkException(splits);
-
-                    if (last == null) {
-                        last = new SplitThread();
-                        splits.add(last);
-                        last.start();
-                        logger.info("Split #" + splits.size() + " kickoff");
+                    if (last.inputController.ifEnd()) {
+                        break;
                     }
-
-                    eof = feedSomeInput(input, last, unitRows);
                 }
 
-                for (SplitThread split : splits) {
-                    split.join();
-                }
-                checkException(splits);
                 logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");
 
                 merger.mergeAndOutput(splits, output);
@@ -202,81 +191,18 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                 throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
             }
         }
-
-        private boolean feedSomeInput(BlockingQueue<List<String>> input, SplitThread split, int n) {
-            try {
-                int i = 0;
-                while (i < n) {
-                    List<String> record = input.take();
-                    i++;
-
-                    while (split.inputQueue.offer(record, 1, TimeUnit.SECONDS) == false) {
-                        if (split.exception != null)
-                            return true; // got some error
-                    }
-                    split.inputRowCount++;
-
-                    if (record == null || record.isEmpty()) {
-                        return true;
-                    }
-                }
-                return false;
-
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException(e);
-            }
-        }
-
-        private void cutSplit(SplitThread last) {
-            try {
-                // signal the end of input
-                while (last.isAlive()) {
-                    if (last.inputQueue.offer(Collections.<String> emptyList())) {
-                        break;
-                    }
-                    Thread.sleep(1000);
-                }
-
-                // wait cuboid build done
-                last.join();
-
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException(e);
-            }
-        }
-
-        private boolean shouldCutSplit(List<SplitThread> splits) {
-            int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
-            int nSplit = splits.size();
-            long splitRowCount = nSplit == 0 ? 0 : splits.get(nSplit - 1).inputRowCount;
-
-            logger.info(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold");
-
-            if (splitRowCount >= splitRowThreshold) {
-                logger.info("Split cut due to hitting splitRowThreshold " + splitRowThreshold);
-                return true;
-            }
-
-            if (systemAvailMB <= reserveMemoryMB * 1.5) {
-                logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + "*1.5 MB");
-                return true;
-            }
-
-            return false;
-        }
     }
 
     private class SplitThread extends Thread {
-        final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(16);
+        final RecordConsumeBlockingQueueController<?> inputController;
         final InMemCubeBuilder builder;
 
         ConcurrentNavigableMap<Long, CuboidResult> buildResult;
-        long inputRowCount = 0;
         RuntimeException exception;
 
-        public SplitThread() {
+        public SplitThread(final int num, final RecordConsumeBlockingQueueController<?> inputController) {
+            super("SplitThread" + num);
+            this.inputController = inputController;
             this.builder = new InMemCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
             this.builder.setConcurrentThreads(taskThreadCount);
             this.builder.setReserveMemoryMB(reserveMemoryMB);
@@ -285,12 +211,13 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
         @Override
         public void run() {
             try {
-                buildResult = builder.build(inputQueue);
+                buildResult = builder.build(inputController);
             } catch (Exception e) {
                 if (e instanceof RuntimeException)
                     this.exception = (RuntimeException) e;
                 else
                     this.exception = new RuntimeException(e);
+                inputController.findException();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 684c26b..f63b53f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -87,6 +87,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private CuboidResult baseResult;
     private Object[] totalSumForSanityCheck;
     private ICuboidCollector resultCollector;
+    private boolean ifBaseCuboidCollected = true;
 
     public InMemCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
             Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -121,8 +122,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     @Override
-    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
-        ConcurrentNavigableMap<Long, CuboidResult> result = build(input);
+    public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+            throws IOException {
+        ConcurrentNavigableMap<Long, CuboidResult> result = build(
+                RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input));
         try {
             for (CuboidResult cuboidResult : result.values()) {
                 outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
@@ -133,7 +136,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         }
     }
 
-    public ConcurrentNavigableMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
+    public <T> ConcurrentNavigableMap<Long, CuboidResult> build(RecordConsumeBlockingQueueController<T> input)
+            throws IOException {
+        if (input.inputConverterUnit instanceof InputConverterUnitForBaseCuboid) {
+            ifBaseCuboidCollected = false;
+        }
         final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
         build(input, new ICuboidCollector() {
             @Override
@@ -150,7 +157,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         void collect(CuboidResult result);
     }
 
-    private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
+    private <T> void build(RecordConsumeBlockingQueueController<T> input, ICuboidCollector collector)
+            throws IOException {
         long startTime = System.currentTimeMillis();
         logger.info("In Mem Cube Build start, " + cubeDesc.getName());
 
@@ -326,7 +334,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         memBudget = new MemoryBudgetController(budget);
     }
 
-    private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
+    private <T> CuboidResult createBaseCuboid(RecordConsumeBlockingQueueController<T> input) throws IOException {
         long startTime = System.currentTimeMillis();
         logger.info("Calculating base cuboid " + baseCuboidId);
 
@@ -356,10 +364,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
         logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
 
-        return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0);
+        return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0, ifBaseCuboidCollected);
     }
 
     private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+        return updateCuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB, true);
+    }
+
+    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB,
+            boolean ifCollect) {
         if (aggrCacheMB <= 0 && baseResult != null) {
             aggrCacheMB = (int) Math.round(//
                     (DERIVE_AGGR_CACHE_CONSTANT_FACTOR + DERIVE_AGGR_CACHE_VARIABLE_FACTOR * nRows / baseResult.nRows) //
@@ -369,7 +382,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
         taskCuboidCompleted.incrementAndGet();
 
-        resultCollector.collect(result);
+        if (ifCollect) {
+            resultCollector.collect(result);
+        }
         return result;
     }
 
@@ -508,62 +523,4 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             return comp < 0 ? -1 : (comp > 0 ? 1 : 0);
         }
     }
-
-    // ============================================================================
-
-    private class InputConverter implements IGTScanner {
-        GTInfo info;
-        GTRecord record;
-        BlockingQueue<List<String>> input;
-        final InMemCubeBuilderInputConverter inMemCubeBuilderInputConverter;
-
-        public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
-            this.info = info;
-            this.input = input;
-            this.record = new GTRecord(info);
-            this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, flatDesc, dictionaryMap, info);
-        }
-
-        @Override
-        public Iterator<GTRecord> iterator() {
-            return new Iterator<GTRecord>() {
-
-                List<String> currentObject = null;
-
-                @Override
-                public boolean hasNext() {
-                    try {
-                        currentObject = input.take();
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException(e);
-                    }
-                    return currentObject != null && currentObject.size() > 0;
-                }
-
-                @Override
-                public GTRecord next() {
-                    if (currentObject.size() == 0)
-                        throw new IllegalStateException();
-
-                    inMemCubeBuilderInputConverter.convert(currentObject, record);
-                    return record;
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
-
-        @Override
-        public void close() throws IOException {
-        }
-
-        @Override
-        public GTInfo getInfo() {
-            return info;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
deleted file mode 100644
index 6dd20d8..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.cube.inmemcubing;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class InMemCubeBuilderInputConverter {
-
-    @SuppressWarnings("unused")
-    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderInputConverter.class);
-    
-    public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
-
-    private final CubeJoinedFlatTableEnrich flatDesc;
-    private final MeasureDesc[] measureDescs;
-    private final MeasureIngester<?>[] measureIngesters;
-    private final int measureCount;
-    private final Map<TblColRef, Dictionary<String>> dictionaryMap;
-    private final GTInfo gtInfo;
-    protected List<byte[]> nullBytes;
-
-    public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
-        this.gtInfo = gtInfo;
-        this.flatDesc = new CubeJoinedFlatTableEnrich(flatDesc, cubeDesc);
-        this.measureCount = cubeDesc.getMeasures().size();
-        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
-        this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-        this.dictionaryMap = dictionaryMap;
-        initNullBytes(cubeDesc);
-    }
-
-    public final GTRecord convert(List<String> row) {
-        final GTRecord record = new GTRecord(gtInfo);
-        convert(row, record);
-        return record;
-    }
-
-    public final void convert(List<String> row, GTRecord record) {
-        Object[] dimensions = buildKey(row);
-        Object[] metricsValues = buildValue(row);
-        Object[] recordValues = new Object[dimensions.length + metricsValues.length];
-        System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
-        System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
-        record.setValues(recordValues);
-    }
-
-    private Object[] buildKey(List<String> row) {
-        int keySize = flatDesc.getRowKeyColumnIndexes().length;
-        Object[] key = new Object[keySize];
-
-        for (int i = 0; i < keySize; i++) {
-            key[i] = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
-            if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) {
-                key[i] = null;
-            }
-        }
-
-        return key;
-    }
-
-    private Object[] buildValue(List<String> row) {
-        Object[] values = new Object[measureCount];
-        for (int i = 0; i < measureCount; i++) {
-            values[i] = buildValueOf(i, row);
-        }
-        return values;
-    }
-
-    private Object buildValueOf(int idxOfMeasure, List<String> row) {
-        MeasureDesc measure = measureDescs[idxOfMeasure];
-        FunctionDesc function = measure.getFunction();
-        int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
-        int paramCount = function.getParameterCount();
-        String[] inputToMeasure = new String[paramCount];
-
-        // pick up parameter values
-        ParameterDesc param = function.getParameter();
-        int paramColIdx = 0; // index among parameters of column type
-        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
-            String value;
-            if (function.isCount()) {
-                value = "1";
-            } else if (param.isColumnType()) {
-                value = row.get(colIdxOnFlatTable[paramColIdx++]);
-            } else {
-                value = param.getValue();
-            }
-            inputToMeasure[i] = value;
-        }
-
-        return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
-    }
-
-    private void initNullBytes(CubeDesc cubeDesc) {
-        nullBytes = Lists.newArrayList();
-        nullBytes.add(HIVE_NULL);
-        String[] nullStrings = cubeDesc.getNullStrings();
-        if (nullStrings != null) {
-            for (String s : nullStrings) {
-                nullBytes.add(Bytes.toBytes(s));
-            }
-        }
-    }
-
-    private boolean isNull(byte[] v) {
-        for (byte[] nullByte : nullBytes) {
-            if (Bytes.equals(v, nullByte))
-                return true;
-        }
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java
new file mode 100644
index 0000000..664f784
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+
+public class InputConverter<T> implements IGTScanner {
+    private GTInfo info;
+    private GTRecord record;
+    private RecordConsumeBlockingQueueController<T> inputController;
+
+    public InputConverter(GTInfo info, RecordConsumeBlockingQueueController<T> inputController) {
+        this.info = info;
+        this.inputController = inputController;
+        this.record = new GTRecord(info);
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return new Iterator<GTRecord>() {
+
+            @Override
+            public boolean hasNext() {
+                return inputController.hasNext();
+            }
+
+            @Override
+            public GTRecord next() {
+                inputController.inputConverterUnit.convert(inputController.next(), record);
+                return record;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
new file mode 100644
index 0000000..fe32937
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.gridtable.GTRecord;
+
+public interface InputConverterUnit<T> {
+    public void convert(T currentObject, GTRecord record);
+
+    public boolean ifEnd(T currentObject);
+
+    public boolean ifCut(T currentObject);
+
+    public T getEmptyUnit();
+
+    public T getCutUnit();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
new file mode 100644
index 0000000..9110a87
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.gridtable.GTRecord;
+
+public class InputConverterUnitForBaseCuboid implements InputConverterUnit<ByteArray> {
+
+    public static final ByteArray EMPTY_ROW = new ByteArray();
+    public static final ByteArray CUT_ROW = new ByteArray(0);
+
+    public void convert(ByteArray currentObject, GTRecord record) {
+        record.loadColumns(currentObject.asBuffer());
+    }
+
+    public boolean ifEnd(ByteArray currentObject) {
+        return currentObject == EMPTY_ROW;
+    }
+
+    public ByteArray getEmptyUnit() {
+        return EMPTY_ROW;
+    }
+
+    public ByteArray getCutUnit() {
+        return CUT_ROW;
+    }
+
+    @Override
+    public boolean ifCut(ByteArray currentObject) {
+        return currentObject == CUT_ROW;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
new file mode 100644
index 0000000..f6548b2
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class InputConverterUnitForRawData implements InputConverterUnit<String[]> {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(InputConverterUnitForRawData.class);
+    
+    public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
+    public static final String[] EMPTY_ROW = new String[0];
+    public static final String[] CUT_ROW = { "" };
+
+    private final CubeJoinedFlatTableEnrich flatDesc;
+    private final MeasureDesc[] measureDescs;
+    private final MeasureIngester<?>[] measureIngesters;
+    private final int measureCount;
+    private final Map<TblColRef, Dictionary<String>> dictionaryMap;
+    protected List<byte[]> nullBytes;
+
+    public InputConverterUnitForRawData(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc,
+            Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        this.flatDesc = new CubeJoinedFlatTableEnrich(flatDesc, cubeDesc);
+        this.measureCount = cubeDesc.getMeasures().size();
+        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+        this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+        this.dictionaryMap = dictionaryMap;
+        initNullBytes(cubeDesc);
+    }
+
+    public final void convert(String[] row, GTRecord record) {
+        Object[] dimensions = buildKey(row);
+        Object[] metricsValues = buildValue(row);
+        Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+        System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+        System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+        record.setValues(recordValues);
+    }
+
+    public boolean ifEnd(String[] currentObject) {
+        return currentObject == EMPTY_ROW;
+    }
+
+    public boolean ifCut(String[] currentObject) {
+        return currentObject == CUT_ROW;
+    }
+
+    public String[] getEmptyUnit() {
+        return EMPTY_ROW;
+    }
+
+    public String[] getCutUnit() {
+        return CUT_ROW;
+    }
+
+    private Object[] buildKey(String[] row) {
+        int keySize = flatDesc.getRowKeyColumnIndexes().length;
+        Object[] key = new Object[keySize];
+
+        for (int i = 0; i < keySize; i++) {
+            key[i] = row[flatDesc.getRowKeyColumnIndexes()[i]];
+            if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) {
+                key[i] = null;
+            }
+        }
+
+        return key;
+    }
+
+    private Object[] buildValue(String[] row) {
+        Object[] values = new Object[measureCount];
+        for (int i = 0; i < measureCount; i++) {
+            values[i] = buildValueOf(i, row);
+        }
+        return values;
+    }
+
+    private Object buildValueOf(int idxOfMeasure, String[] row) {
+        MeasureDesc measure = measureDescs[idxOfMeasure];
+        FunctionDesc function = measure.getFunction();
+        int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+        int paramCount = function.getParameterCount();
+        String[] inputToMeasure = new String[paramCount];
+
+        // pick up parameter values
+        ParameterDesc param = function.getParameter();
+        int paramColIdx = 0; // index among parameters of column type
+        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+            String value;
+            if (function.isCount()) {
+                value = "1";
+            } else if (param.isColumnType()) {
+                value = row[colIdxOnFlatTable[paramColIdx++]];
+            } else {
+                value = param.getValue();
+            }
+            inputToMeasure[i] = value;
+        }
+
+        return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
+    }
+
+    private void initNullBytes(CubeDesc cubeDesc) {
+        nullBytes = Lists.newArrayList();
+        nullBytes.add(HIVE_NULL);
+        String[] nullStrings = cubeDesc.getNullStrings();
+        if (nullStrings != null) {
+            for (String s : nullStrings) {
+                nullBytes.add(Bytes.toBytes(s));
+            }
+        }
+    }
+
+    private boolean isNull(byte[] v) {
+        for (byte[] nullByte : nullBytes) {
+            if (Bytes.equals(v, nullByte))
+                return true;
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
new file mode 100644
index 0000000..49cbe1f
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.concurrent.BlockingQueue;
+
+public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueueController<T> {
+
+    public final InputConverterUnit<T> inputConverterUnit;
+
+    private RecordConsumeBlockingQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input, int batchSize) {
+        super(input, batchSize);
+        this.inputConverterUnit = inputConverterUnit;
+    }
+   
+    private T currentObject = null;
+    private volatile boolean ifEnd = false;
+    private volatile boolean cut = false;
+    private long outputRowCountCut = 0L;
+
+    @Override
+    public boolean hasNext() {
+        if (currentObject != null) {
+            return hasNext(currentObject);
+        }
+        if (!super.hasNext()) {
+            return false;
+        }
+        currentObject = super.next();
+        return hasNext(currentObject);
+    }
+
+    @Override
+    public T next() {
+        if (ifEnd())
+            throw new IllegalStateException();
+
+        T result = currentObject;
+        currentObject = null;
+        return result;
+    }
+
+    public boolean ifEnd() {
+        return ifEnd;
+    }
+
+    private boolean hasNext(T object) {
+        if (inputConverterUnit.ifEnd(object)) {
+            ifEnd = true;
+            return false;
+        }else if(cut){
+            return false;
+        }else if(inputConverterUnit.ifCut(object)){
+            return false;
+        }
+        return true;
+    }
+    
+    public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){
+        return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE);
+    }
+    
+    public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input, int batchSize){
+        return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, batchSize);
+    }
+
+    public void forceCutPipe() {
+        cut = true;
+        outputRowCountCut = getOutputRowCount();
+    }
+
+    public long getOutputRowCountAfterCut() {
+        return getOutputRowCount() - outputRowCountCut;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index ba3b1c4..36bd095 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -251,6 +251,11 @@ public class GTRecord implements Comparable<GTRecord> {
         loadColumns(info.colBlocks[c], buf);
     }
 
+    /** change pointers to point to data in given buffer, UNLIKE deserialize */
+    public void loadColumns(ByteBuffer buf) {
+        loadColumns(info.colAll, buf);
+    }
+
     /**
      * Change pointers to point to data in given buffer, UNLIKE deserialize
      * @param selectedCols positions of column to load

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 74f8391..e6c208b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -506,9 +506,19 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     }
 
     protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
+        attachSegmentMetadata(segment, conf, true, true);
+    }
+
+    protected void attachSegmentMetadata(CubeSegment segment, Configuration conf, boolean ifDictIncluded,
+            boolean ifStatsIncluded) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
-        dumpList.addAll(segment.getDictionaryPaths());
+        if (ifDictIncluded) {
+            dumpList.addAll(segment.getDictionaryPaths());
+        }
+        if (ifStatsIncluded) {
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
         dumpKylinPropsAndMetadata(segment.getProject(), dumpList, segment.getConfig(), conf);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 25a67f9..aaf2654 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -29,6 +29,7 @@ public interface BatchConstants {
      * ConFiGuration entry names for MR jobs
      */
 
+    String CFG_CUBOID_MODE = "cuboid.mode";
     String CFG_CUBE_NAME = "cube.name";
     String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
     String CFG_CUBE_SEGMENT_ID = "cube.segment.id";

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index a7b3923..859e126 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -19,133 +19,60 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
 import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-/**
- */
-public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
-
-    private static final Logger logger = LoggerFactory.getLogger(InMemCuboidMapper.class);
+public class InMemCuboidMapper<KEYIN>
+        extends InMemCuboidMapperBase<KEYIN, Object, ByteArrayWritable, ByteArrayWritable, String[]> {
 
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment cubeSegment;
-    private IMRTableInputFormat flatTableInputFormat;
 
-    private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64);
-    private Future<?> future;
+    private IMRInput.IMRTableInputFormat flatTableInputFormat;
 
     @Override
     protected void doSetup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
+        super.doSetup(context);
 
-        Configuration conf = context.getConfiguration();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
-        cubeSegment = cube.getSegmentById(segmentID);
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
-        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
-
-        Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
-
-        // dictionary
-        for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) {
-            Dictionary<?> dict = cubeSegment.getDictionary(col);
-            if (dict == null) {
-                logger.warn("Dictionary for " + col + " was not found.");
-            }
-
-            dictionaryMap.put(col, cubeSegment.getDictionary(col));
-        }
-
-        int taskCount = config.getCubeAlgorithmInMemConcurrentThreads();
-        DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cubeSegment.getCuboidScheduler(), flatDesc,
-                dictionaryMap);
-        cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration()));
-        cubeBuilder.setConcurrentThreads(taskCount);
-
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
-
     }
 
-    private int calculateReserveMB(Configuration configuration) {
-        int sysAvailMB = MemoryBudgetController.getSystemAvailMB();
-        int mrReserve = configuration.getInt("mapreduce.task.io.sort.mb", 100);
-        int sysReserve = Math.max(sysAvailMB / 10, 100);
-        int reserveMB = mrReserve + sysReserve;
-        logger.info("Reserve " + reserveMB + " MB = " + mrReserve + " (MR reserve) + " + sysReserve + " (SYS reserve)");
-        return reserveMB;
+    @Override
+    protected InputConverterUnit<String[]> getInputConverterUnit() {
+        Preconditions.checkNotNull(cubeDesc);
+        Preconditions.checkNotNull(dictionaryMap);
+        return new InputConverterUnitForRawData(cubeDesc, flatDesc, dictionaryMap);
     }
 
     @Override
-    public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
-        // put each row to the queue
-        Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
-
-        for(String[] row: rowCollection) {
-            List<String> rowAsList = Arrays.asList(row);
-            while (!future.isDone()) {
-                if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
-                    break;
-                }
-            }
-        }
+    protected String[] getRecordFromKeyValue(KEYIN key, Object value) {
+        return flatTableInputFormat.parseMapperInput(value).iterator().next();
     }
 
     @Override
-    protected void doCleanup(Context context) throws IOException, InterruptedException {
-        logger.info("Totally handled " + mapCounter + " records!");
-
-        while (!future.isDone()) {
-            if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) {
-                break;
-            }
-        }
-
-        try {
-            future.get();
-        } catch (Exception e) {
-            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
-        }
-        queue.clear();
+    protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
+            int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<String[]> inputConverterUnit) {
+        AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
+        cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
+        cubeBuilder.setConcurrentThreads(taskThreadCount);
+
+        ExecutorService executorService = Executors.newSingleThreadExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build());
+        return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit,
+                new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
new file mode 100644
index 0000000..43f95e5
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler;
+import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager;
+import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ */
+public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T> extends KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+    private static final Logger logger = LoggerFactory.getLogger(InMemCuboidMapperBase.class);
+
+    private int reserveMemoryMB;
+    private int nSplit = 1;
+    private int countOfLastSplit = 0;
+    private int counter = 0;
+    private int splitRowThreshold = Integer.MAX_VALUE;
+    private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE;
+
+    protected CubeInstance cube;
+    protected CubeDesc cubeDesc;
+    protected CubeSegment cubeSegment;
+    protected Map<TblColRef, Dictionary<String>> dictionaryMap;
+    protected IJoinedFlatTableDesc flatDesc;
+
+    protected int taskThreadCount;
+    protected BlockingQueue<T> queue = new LinkedBlockingQueue<>(2000);
+    protected InputConverterUnit<T> inputConverterUnit;
+    private Future<?> future;
+
+    protected abstract InputConverterUnit<T> getInputConverterUnit();
+
+    protected abstract Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap, int reserveMemoryMB, //
+                                                    CuboidScheduler cuboidScheduler, InputConverterUnit<T> inputConverterUnit);
+
+    protected abstract T getRecordFromKeyValue(KEYIN key, VALUEIN value);
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        Configuration conf = context.getConfiguration();
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+        cubeSegment = cube.getSegmentById(segmentID);
+        flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+
+        dictionaryMap = Maps.newHashMap();
+
+        // dictionary
+        for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) {
+            Dictionary<?> dict = cubeSegment.getDictionary(col);
+            if (dict == null) {
+                logger.warn("Dictionary for " + col + " was not found.");
+            }
+
+            dictionaryMap.put(col, cubeSegment.getDictionary(col));
+        }
+
+        // check memory more often if a single row is big
+        if (cubeDesc.hasMemoryHungryMeasures()) {
+            unitRows /= 10;
+        }
+
+        String cuboidModeName = conf.get(BatchConstants.CFG_CUBOID_MODE);
+        CuboidScheduler cuboidScheduler = TreeCuboidSchedulerManager.getTreeCuboidScheduler(cubeDesc, //
+                CuboidStatsReaderUtil.readCuboidStatsFromSegment(cube.getCuboidsByMode(cuboidModeName), cubeSegment));
+        if (cuboidScheduler == null) {
+            cuboidScheduler = new DefaultCuboidScheduler(cubeDesc);
+        }
+
+        taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
+        reserveMemoryMB = calculateReserveMB(conf);
+        inputConverterUnit = getInputConverterUnit();
+        future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler, inputConverterUnit);
+    }
+
+    private int calculateReserveMB(Configuration configuration) {
+        int sysAvailMB = MemoryBudgetController.getSystemAvailMB();
+        int mrReserve = configuration.getInt("mapreduce.task.io.sort.mb", 100);
+        int sysReserve = Math.max(sysAvailMB / 10, 100);
+        int reserveMB = mrReserve + sysReserve;
+        logger.info("Reserve " + reserveMB + " MB = " + mrReserve + " (MR reserve) + " + sysReserve + " (SYS reserve)");
+        return reserveMB;
+    }
+
+    @Override
+    public void doMap(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
+        // put each row to the queue
+        T row = getRecordFromKeyValue(key, value);
+
+        if (offer(context, row, 1, TimeUnit.MINUTES, 60)) {
+            counter++;
+            countOfLastSplit++;
+            if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+                logger.info("Handled " + counter + " records, internal queue size = " + queue.size());
+            }
+        } else {
+            throw new IOException("Failed to offer row to internal queue due to queue full!");
+        }
+
+        if (counter % unitRows == 0 && shouldCutSplit(nSplit, countOfLastSplit)) {
+            if (offer(context, inputConverterUnit.getCutUnit(), 1, TimeUnit.MINUTES, 60)) {
+                countOfLastSplit = 0;
+            } else {
+                throw new IOException("Failed to offer row to internal queue due to queue full!");
+            }
+            nSplit++;
+        }
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        logger.info("Totally handled " + mapCounter + " records!");
+
+        while (!future.isDone()) {
+            if (queue.offer(inputConverterUnit.getEmptyUnit(), 1, TimeUnit.SECONDS)) {
+                break;
+            }
+        }
+
+        futureGet(context);
+        queue.clear();
+    }
+
+    private boolean shouldCutSplit(int nSplit, long splitRowCount) {
+        int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
+
+        logger.info(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold");
+
+        if (splitRowCount >= splitRowThreshold) {
+            logger.info("Split cut due to hitting splitRowThreshold " + splitRowThreshold);
+            return true;
+        }
+
+        if (systemAvailMB <= reserveMemoryMB) {
+            logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + " MB");
+            return true;
+        }
+
+        return false;
+    }
+
+    private boolean offer(Context context, T row, long timeout, TimeUnit unit, int nRound) throws IOException, InterruptedException {
+        while (nRound > 0) {
+            if (queue.offer(row, timeout, unit)) {
+                return true;
+            }
+            if (future.isDone()) {
+                futureGet(context);
+                throw new IOException("Failed to build cube in mapper due to cubing thread exit unexpectedly");
+            }
+            nRound--;
+        }
+        return false;
+    }
+
+    private void futureGet(Context context) throws IOException {
+        try {
+            future.get();
+        } catch (Exception e) {
+            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index 5e5b16a..fa2d792 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.cube.inmemcubing;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -79,7 +78,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
     @Test
     public void test() throws Exception {
 
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ArrayBlockingQueue<String[]> queue = new ArrayBlockingQueue<String[]>(1000);
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         long randSeed = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 1e10d79..0338da8 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -26,7 +26,6 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -83,7 +82,7 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
     @Test
     public void test() throws Exception {
 
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ArrayBlockingQueue<String[]> queue = new ArrayBlockingQueue<String[]>(1000);
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         long randSeed = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/bdf0f69c/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 49d267a..ad754cd 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -113,7 +113,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
         //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
         cubeBuilder.setConcurrentThreads(nThreads);
 
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ArrayBlockingQueue<String[]> queue = new ArrayBlockingQueue<String[]>(1000);
         ExecutorService executorService = Executors.newSingleThreadExecutor();
 
         try {
@@ -144,11 +144,13 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
         }
     }
 
-    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count)
+            throws IOException, InterruptedException {
         feedData(cube, flatTable, queue, count, 0);
     }
 
-    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count,
+            long randSeed) throws IOException, InterruptedException {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
         int nColumns = flatDesc.getAllColumns().size();
 
@@ -177,14 +179,14 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
 
         // output with random data
         for (; count > 0; count--) {
-            ArrayList<String> row = new ArrayList<String>(nColumns);
+            String[] row = new String[nColumns];
             for (int i = 0; i < nColumns; i++) {
                 String[] candidates = distincts.get(i);
-                row.add(candidates[rand.nextInt(candidates.length)]);
+                row[i] = candidates[rand.nextInt(candidates.length)];
             }
             queue.put(row);
         }
-        queue.put(new ArrayList<String>(0));
+        queue.put(InputConverterUnitForRawData.EMPTY_ROW);
     }
 
     static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {