You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/12 10:55:40 UTC

[kylin] branch master updated: APACHE-KYLIN-2932: Simplify the thread model for in-memory cubing

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new bef9e5c  APACHE-KYLIN-2932: Simplify the thread model for in-memory cubing
bef9e5c is described below

commit bef9e5c13b48f150145327a2648a5cbbd0d425c2
Author: U-CORP\mingmwang <mi...@D-SHC-00437006.corp.ebay.com>
AuthorDate: Thu Oct 12 17:05:52 2017 +0800

    APACHE-KYLIN-2932: Simplify the thread model for in-memory cubing
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../cube/inmemcubing/CompoundCuboidWriter.java     |   8 +
 .../kylin/cube/inmemcubing/DoggedCubeBuilder.java  |   4 +-
 .../cube/inmemcubing/ICuboidGTTableWriter.java     |  47 +++
 .../kylin/cube/inmemcubing/ICuboidWriter.java      |   3 +
 .../kylin/cube/inmemcubing/InMemCubeBuilder.java   |  18 +-
 .../apache/kylin/cube/inmemcubing2/CuboidTask.java |  53 +++
 .../DefaultCuboidCollectorWithCallBack.java        |  53 +++
 .../cube/inmemcubing2/DoggedCubeBuilder2.java      | 442 +++++++++++++++++++++
 .../ICuboidCollectorWithCallBack.java}             |  62 ++-
 .../ICuboidResultListener.java}                    |  59 ++-
 .../kylin/cube/inmemcubing2/InMemCubeBuilder2.java | 408 +++++++++++++++++++
 .../mr/steps/InMemCuboidFromBaseCuboidMapper.java  |  25 +-
 .../kylin/engine/mr/steps/InMemCuboidMapper.java   |  23 +-
 .../engine/mr/steps/InMemCuboidMapperBase.java     |  30 +-
 .../kylin/engine/mr/steps/KVGTRecordWriter.java    |   4 +-
 .../inmemcubing/ITDoggedCubeBuilderStressTest.java |   6 +
 .../cube/inmemcubing/ITDoggedCubeBuilderTest.java  |  73 +++-
 .../cube/inmemcubing/ITInMemCubeBuilderTest.java   |   7 +
 19 files changed, 1196 insertions(+), 133 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 193329b..018552c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -808,6 +808,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.job.cube-auto-ready-enabled", TRUE));
     }
 
+    public String getCubeInMemBuilderClass() {
+        return getOptional("kylin.job.cube-inmem-builder-class", "org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder");
+    }
+
     // ============================================================================
     // SOURCE.HIVE
     // ============================================================================
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
index c82f418..df77978 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
@@ -21,6 +21,7 @@ package org.apache.kylin.cube.inmemcubing;
 import java.io.IOException;
 
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 
 /**
  */
@@ -39,6 +40,13 @@ public class CompoundCuboidWriter implements ICuboidWriter {
             writer.write(cuboidId, record);
         }
     }
+    
+    @Override
+    public void write(long cuboidId, GridTable table) throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.write(cuboidId, table);
+        }
+    }
 
     @Override
     public void flush() throws IOException {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 8368051..39dce26 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.PriorityQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Dictionary;
@@ -181,7 +181,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
         final RecordConsumeBlockingQueueController<?> inputController;
         final InMemCubeBuilder builder;
 
-        ConcurrentNavigableMap<Long, CuboidResult> buildResult;
+        NavigableMap<Long, CuboidResult> buildResult;
         RuntimeException exception;
 
         public SplitThread(final int num, final RecordConsumeBlockingQueueController<?> inputController) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
new file mode 100755
index 0000000..93a7994
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ICuboidGTTableWriter implements ICuboidWriter{
+    
+    private static Logger logger = LoggerFactory.getLogger(ICuboidGTTableWriter.class);
+    
+    @Override
+    public void write(long cuboidId, GridTable gridTable) throws IOException {
+        long startTime = System.currentTimeMillis();
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
+        IGTScanner scanner = gridTable.scan(req);
+        for (GTRecord record : scanner) {
+            write(cuboidId, record);
+        }
+        scanner.close();
+        logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
index 3f6cb0c..4ae182e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -21,6 +21,7 @@ package org.apache.kylin.cube.inmemcubing;
 import java.io.IOException;
 
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 
 /**
  */
@@ -28,6 +29,8 @@ public interface ICuboidWriter {
 
     void write(long cuboidId, GTRecord record) throws IOException;
 
+    void write(long cuboidId, GridTable table) throws IOException;
+
     void flush() throws IOException;
 
     void close() throws IOException;
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e0bdb20..9661fa8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -19,14 +19,13 @@
 package org.apache.kylin.cube.inmemcubing;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -47,6 +46,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStore;
 import org.apache.kylin.measure.topn.Counter;
 import org.apache.kylin.measure.topn.TopNCounter;
 import org.apache.kylin.metadata.datatype.DoubleMutable;
@@ -111,7 +111,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
                 new CubeDimEncMap(cubeDesc, dictionaryMap)
         );
 
-        ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+        // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+        // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
+        // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+        IGTStore store = new ConcurrentDiskStore(info);
 
         GridTable gridTable = new GridTable(info, store);
         return gridTable;
@@ -120,7 +123,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     @Override
     public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
             throws IOException {
-        ConcurrentNavigableMap<Long, CuboidResult> result = build(
+        NavigableMap<Long, CuboidResult> result = build(
                 RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input));
         try {
             for (CuboidResult cuboidResult : result.values()) {
@@ -132,9 +135,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         }
     }
 
-    public <T> ConcurrentNavigableMap<Long, CuboidResult> build(RecordConsumeBlockingQueueController<T> input)
+    public <T> NavigableMap<Long, CuboidResult> build(RecordConsumeBlockingQueueController<T> input)
             throws IOException {
-        final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
+        final NavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
         build(input, new ICuboidCollector() {
             @Override
             public void collect(CuboidResult cuboidResult) {
@@ -213,7 +216,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private void throwExceptionIfAny() throws IOException {
-        ArrayList<Throwable> errors = new ArrayList<>();
+        List<Throwable> errors = Lists.newArrayList();
+
         for (int i = 0; i < taskThreadCount; i++) {
             Throwable t = taskThreadExceptions[i];
             if (t != null)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java
new file mode 100755
index 0000000..cf54eb6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.io.IOException;
+import java.util.concurrent.RecursiveTask;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+
+@SuppressWarnings("serial")
+class CuboidTask extends RecursiveTask<CuboidResult> implements Comparable<CuboidTask> {
+    final CuboidResult parent;
+    final long childCuboidId;
+    final InMemCubeBuilder2 cubeBuilder;
+    
+    CuboidTask(CuboidResult parent, long childCuboidId, InMemCubeBuilder2 cubeBuilder) {
+        this.parent = parent;
+        this.childCuboidId = childCuboidId;
+        this.cubeBuilder = cubeBuilder;
+    }
+
+    @Override
+    public int compareTo(CuboidTask o) {
+        long comp = this.childCuboidId - o.childCuboidId;
+        return comp < 0 ? -1 : (comp > 0 ? 1 : 0);
+    }
+
+    @Override
+    protected CuboidResult compute() {
+        try {
+            return cubeBuilder.buildCuboid(this);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java
new file mode 100755
index 0000000..d7f738d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultCuboidCollectorWithCallBack implements ICuboidCollectorWithCallBack{
+    
+    private static Logger logger = LoggerFactory.getLogger(DefaultCuboidCollectorWithCallBack.class);
+    
+    final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
+    final ICuboidResultListener listener;
+    
+    public DefaultCuboidCollectorWithCallBack(ICuboidResultListener listener){
+        this.listener = listener;
+    }
+
+    @Override
+    public void collectAndNotify(CuboidResult cuboidResult) {
+        logger.info("collecting CuboidResult cuboid id:" + cuboidResult.cuboidId);
+        result.put(cuboidResult.cuboidId, cuboidResult);
+        if (listener != null) {
+            listener.finish(cuboidResult);
+        }
+    }
+
+    @Override
+    public NavigableMap<Long, CuboidResult> getAllResult() {
+        return result;
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java
new file mode 100755
index 0000000..4c5da87
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.RecursiveTask;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.RecordConsumeBlockingQueueController;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+public class DoggedCubeBuilder2 extends AbstractInMemCubeBuilder {
+    private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder2.class);
+
+    public DoggedCubeBuilder2(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
+            Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        super(cuboidScheduler, flatDesc, dictionaryMap);
+    }
+
+    @Override
+    public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+            throws IOException {
+        new BuildOnce().build(input, inputConverterUnit, output);
+    }
+
+    private class BuildOnce {
+        public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+                throws IOException {
+            final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
+                    .getQueueController(inputConverterUnit, input);
+
+            final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();
+
+            ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
+                @Override
+                public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+                    final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+                    worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
+                    return worker;
+                }
+            };
+
+            ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
+            CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);
+
+            Stopwatch sw = new Stopwatch();
+            sw.start();
+            logger.info("Dogged Cube Build2 start");
+            try {
+                BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
+                builderPool.execute(task);
+                do {
+                    builderList.add(task.getInternalBuilder());
+                    //Exception will be thrown here if cube building failure
+                    task.join();
+                    task = task.nextTask();
+                } while (task != null);
+
+                logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
+                for (final InMemCubeBuilder2 builder : builderList) {
+                    builderPool.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            builder.startBuildFromBaseCuboid();
+                        }
+                    });
+                }
+                resultWatcher.start();
+                logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsedMillis() + " ms");
+            } catch (Throwable e) {
+                logger.error("Dogged Cube Build2 error", e);
+                if (e instanceof Error)
+                    throw (Error) e;
+                else if (e instanceof RuntimeException)
+                    throw (RuntimeException) e;
+                else
+                    throw new IOException(e);
+            } finally {
+                output.close();
+                closeGirdTables(builderList);
+                sw.stop();
+                builderPool.shutdownNow();
+                logger.info("Dogged Cube Build2 end, totally took " + sw.elapsedMillis() + " ms");
+                logger.info("Dogged Cube Build2 return");
+            }
+        }
+
+        private void closeGirdTables(List<InMemCubeBuilder2> builderList) {
+            for (InMemCubeBuilder2 inMemCubeBuilder : builderList) {
+                for (CuboidResult cuboidResult : inMemCubeBuilder.getResultCollector().getAllResult().values()) {
+                    closeGirdTable(cuboidResult.table);
+                }
+            }
+        }
+
+        private void closeGirdTable(GridTable gridTable) {
+            try {
+                gridTable.close();
+            } catch (Throwable e) {
+                logger.error("Error closing grid table " + gridTable, e);
+            }
+        }
+    }
+
+    private class BaseCuboidTask<T> extends RecursiveTask<CuboidResult> {
+        private static final long serialVersionUID = -5408592502260876799L;
+
+        private final int splitSeq;
+        private final ICuboidResultListener resultListener;
+
+        private RecordConsumeBlockingQueueController<T> inputController;
+        private InMemCubeBuilder2 builder;
+
+        private volatile BaseCuboidTask<T> next;
+
+        public BaseCuboidTask(final RecordConsumeBlockingQueueController<T> inputController, int splitSeq,
+                ICuboidResultListener resultListener) {
+            this.inputController = inputController;
+            this.splitSeq = splitSeq;
+            this.resultListener = resultListener;
+            this.builder = new InMemCubeBuilder2(cuboidScheduler, flatDesc, dictionaryMap);
+            builder.setReserveMemoryMB(reserveMemoryMB);
+            builder.setConcurrentThreads(taskThreadCount);
+            logger.info("Split #" + splitSeq + " kickoff");
+        }
+
+        @Override
+        protected CuboidResult compute() {
+            try {
+                CuboidResult baseCuboidResult = builder.buildBaseCuboid(inputController, resultListener);
+                if (!inputController.ifEnd()) {
+                    next = new BaseCuboidTask<>(inputController, splitSeq + 1, resultListener);
+                    next.fork();
+                }
+                logger.info("Split #" + splitSeq + " finished");
+                return baseCuboidResult;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public InMemCubeBuilder2 getInternalBuilder() {
+            return builder;
+        }
+
+        public BaseCuboidTask<T> nextTask() {
+            return next;
+        }
+    }
+
+    /**
+     * Class response for watch the cube building result, monitor the cube building process and trigger merge actions if required.
+     *
+     */
+    private class CuboidResultWatcher implements ICuboidResultListener {
+        final BlockingQueue<CuboidResult> outputQueue;
+        final Map<Long, List<CuboidResult>> pendingQueue = Maps.newHashMap();
+        final List<InMemCubeBuilder2> builderList;
+        final ICuboidWriter output;
+
+        public CuboidResultWatcher(final List<InMemCubeBuilder2> builderList, final ICuboidWriter output) {
+            this.outputQueue = Queues.newLinkedBlockingQueue();
+            this.builderList = builderList;
+            this.output = output;
+        }
+
+        public void start() throws IOException {
+            SplitMerger merger = new SplitMerger();
+            while (true) {
+                if (!outputQueue.isEmpty()) {
+                    List<CuboidResult> splitResultReturned = Lists.newArrayList();
+                    outputQueue.drainTo(splitResultReturned);
+                    for (CuboidResult splitResult : splitResultReturned) {
+                        if (builderList.size() == 1) {
+                            merger.mergeAndOutput(Lists.newArrayList(splitResult), output);
+                        } else {
+                            List<CuboidResult> cuboidResultList = pendingQueue.get(splitResult.cuboidId);
+                            if (cuboidResultList == null) {
+                                cuboidResultList = Lists.newArrayListWithExpectedSize(builderList.size());
+                                cuboidResultList.add(splitResult);
+                                pendingQueue.put(splitResult.cuboidId, cuboidResultList);
+                            } else {
+                                cuboidResultList.add(splitResult);
+                            }
+                            if (cuboidResultList.size() == builderList.size()) {
+                                merger.mergeAndOutput(cuboidResultList, output);
+                                pendingQueue.remove(splitResult.cuboidId);
+                            }
+                        }
+                    }
+                }
+
+                boolean jobFinished = isAllBuildFinished();
+                if (outputQueue.isEmpty() && !jobFinished) {
+                    boolean ifWait = true;
+                    for (InMemCubeBuilder2 builder : builderList) {
+                        Queue<CuboidTask> queue = builder.getCompletedTaskQueue();
+                        while (queue.size() > 0) {
+                            CuboidTask childTask = queue.poll();
+                            if (childTask.isCompletedAbnormally()) {
+                                throw new RuntimeException(childTask.getException());
+                            }
+                            ifWait = false;
+                        }
+                    }
+                    if (ifWait) {
+                        try {
+                            Thread.sleep(100L);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                } else if (outputQueue.isEmpty() && pendingQueue.isEmpty() && jobFinished) {
+                    return;
+                }
+            }
+        }
+
+        private boolean isAllBuildFinished() {
+            for (InMemCubeBuilder2 split : builderList) {
+                if (!split.isAllCuboidDone()) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public void finish(CuboidResult result) {
+            Stopwatch stopwatch = new Stopwatch().start();
+            int nRetries = 0;
+            while (!outputQueue.offer(result)) {
+                nRetries++;
+                long sleepTime = stopwatch.elapsedMillis();
+                if (sleepTime > 3600000L) {
+                    stopwatch.stop();
+                    throw new RuntimeException(
+                            "OutputQueue Full. Cannot offer to the output queue after waiting for one hour!!! Current queue size: "
+                                    + outputQueue.size());
+                }
+                logger.warn("OutputQueue Full. Queue size: " + outputQueue.size() + ". Total sleep time : " + sleepTime
+                        + ", and retry count : " + nRetries);
+                try {
+                    Thread.sleep(5000L);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            stopwatch.stop();
+        }
+    }
+
+    private class SplitMerger {
+        MeasureAggregators reuseAggrs;
+        Object[] reuseMetricsArray;
+        ByteArray reuseMetricsSpace;
+
+        long lastCuboidColumnCount;
+        ImmutableBitSet lastMetricsColumns;
+
+        SplitMerger() {
+            reuseAggrs = new MeasureAggregators(cubeDesc.getMeasures());
+            reuseMetricsArray = new Object[cubeDesc.getMeasures().size()];
+        }
+
+        public void mergeAndOutput(List<CuboidResult> splitResultList, ICuboidWriter output) throws IOException {
+            if (splitResultList.size() == 1) {
+                CuboidResult cuboidResult = splitResultList.get(0);
+                outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+                return;
+            }
+            LinkedList<ResultMergeSlot> open = Lists.newLinkedList();
+            for (CuboidResult splitResult : splitResultList) {
+                open.add(new ResultMergeSlot(splitResult));
+            }
+
+            PriorityQueue<ResultMergeSlot> heap = new PriorityQueue<ResultMergeSlot>();
+            while (true) {
+                // ready records in open slots and add to heap
+                while (!open.isEmpty()) {
+                    ResultMergeSlot slot = open.removeFirst();
+                    if (slot.fetchNext()) {
+                        heap.add(slot);
+                    }
+                }
+
+                // find the smallest on heap
+                ResultMergeSlot smallest = heap.poll();
+                if (smallest == null)
+                    break;
+                open.add(smallest);
+
+                // merge with slots having the same key
+                if (smallest.isSameKey(heap.peek())) {
+                    Object[] metrics = getMetricsValues(smallest.currentRecord);
+                    reuseAggrs.reset();
+                    reuseAggrs.aggregate(metrics);
+                    do {
+                        ResultMergeSlot slot = heap.poll();
+                        open.add(slot);
+                        metrics = getMetricsValues(slot.currentRecord);
+                        reuseAggrs.aggregate(metrics);
+                    } while (smallest.isSameKey(heap.peek()));
+
+                    reuseAggrs.collectStates(metrics);
+                    setMetricsValues(smallest.currentRecord, metrics);
+                }
+                output.write(smallest.currentCuboidId, smallest.currentRecord);
+            }
+        }
+
+        private void setMetricsValues(GTRecord record, Object[] metricsValues) {
+            ImmutableBitSet metrics = getMetricsColumns(record);
+
+            if (reuseMetricsSpace == null) {
+                reuseMetricsSpace = new ByteArray(record.getInfo().getMaxColumnLength(metrics));
+            }
+
+            record.setValues(metrics, reuseMetricsSpace, metricsValues);
+        }
+
+        private Object[] getMetricsValues(GTRecord record) {
+            ImmutableBitSet metrics = getMetricsColumns(record);
+            return record.getValues(metrics, reuseMetricsArray);
+        }
+
+        private ImmutableBitSet getMetricsColumns(GTRecord record) {
+            // metrics columns always come after dimension columns
+            if (lastCuboidColumnCount == record.getInfo().getColumnCount())
+                return lastMetricsColumns;
+
+            int to = record.getInfo().getColumnCount();
+            int from = to - reuseMetricsArray.length;
+            lastCuboidColumnCount = record.getInfo().getColumnCount();
+            lastMetricsColumns = new ImmutableBitSet(from, to);
+            return lastMetricsColumns;
+        }
+    }
+
+    private static class ResultMergeSlot implements Comparable<ResultMergeSlot> {
+        CuboidResult splitResult;
+        IGTScanner scanner;
+        Iterator<GTRecord> recordIterator;
+
+        long currentCuboidId;
+        GTRecord currentRecord;
+
+        public ResultMergeSlot(CuboidResult splitResult) {
+            this.splitResult = splitResult;
+        }
+
+        public boolean fetchNext() throws IOException {
+            if (recordIterator == null) {
+                currentCuboidId = splitResult.cuboidId;
+                scanner = splitResult.table.scan(new GTScanRequestBuilder().setInfo(splitResult.table.getInfo())
+                        .setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
+                recordIterator = scanner.iterator();
+            }
+
+            if (recordIterator.hasNext()) {
+                currentRecord = recordIterator.next();
+                return true;
+            } else {
+                scanner.close();
+                recordIterator = null;
+                return false;
+            }
+        }
+
+        @Override
+        public int compareTo(ResultMergeSlot o) {
+            long cuboidComp = this.currentCuboidId - o.currentCuboidId;
+            if (cuboidComp != 0)
+                return cuboidComp < 0 ? -1 : 1;
+
+            // note GTRecord.equals() don't work because the two GTRecord comes from different GridTable
+            ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey();
+            for (int i = 0; i < pk.trueBitCount(); i++) {
+                int c = pk.trueBitAt(i);
+                int comp = this.currentRecord.get(c).compareTo(o.currentRecord.get(c));
+                if (comp != 0)
+                    return comp;
+            }
+            return 0;
+        }
+
+        public boolean isSameKey(ResultMergeSlot o) {
+            if (o == null)
+                return false;
+            else
+                return this.compareTo(o) == 0;
+        }
+
+    };
+}
\ No newline at end of file
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java
old mode 100644
new mode 100755
similarity index 72%
copy from core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
copy to core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java
index 3f6cb0c..b669bbe
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java
@@ -1,34 +1,28 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.inmemcubing;
-
-import java.io.IOException;
-
-import org.apache.kylin.gridtable.GTRecord;
-
-/**
- */
-public interface ICuboidWriter {
-
-    void write(long cuboidId, GTRecord record) throws IOException;
-
-    void flush() throws IOException;
-
-    void close() throws IOException;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.util.NavigableMap;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+
+public interface ICuboidCollectorWithCallBack {
+    void collectAndNotify(CuboidResult result);
+    NavigableMap<Long, CuboidResult> getAllResult();
+}
\ No newline at end of file
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java
old mode 100644
new mode 100755
similarity index 72%
copy from core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
copy to core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java
index 3f6cb0c..6d80f00
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java
@@ -1,34 +1,25 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.inmemcubing;
-
-import java.io.IOException;
-
-import org.apache.kylin.gridtable.GTRecord;
-
-/**
- */
-public interface ICuboidWriter {
-
-    void write(long cuboidId, GTRecord record) throws IOException;
-
-    void flush() throws IOException;
-
-    void close() throws IOException;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+
+public interface ICuboidResultListener {
+    void finish(CuboidResult cuboidResult);
+}
\ No newline at end of file
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java
new file mode 100755
index 0000000..35a4d09
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ConcurrentDiskStore;
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.InMemCubeBuilderUtils;
+import org.apache.kylin.cube.inmemcubing.InputConverter;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.RecordConsumeBlockingQueueController;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
+import org.apache.kylin.gridtable.GTAggregateScanner;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+/**
+ * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
+ * Assumes base cuboid fits in memory or otherwise OOM exception will occur.
+ */
+public class InMemCubeBuilder2 extends AbstractInMemCubeBuilder {
+    private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder2.class);
+
+    // by experience
+    private static final double DERIVE_AGGR_CACHE_CONSTANT_FACTOR = 0.1;
+    private static final double DERIVE_AGGR_CACHE_VARIABLE_FACTOR = 0.9;
+
+    protected final String[] metricsAggrFuncs;
+    protected final MeasureDesc[] measureDescs;
+    protected final int measureCount;
+
+    private MemoryBudgetController memBudget;
+    protected final long baseCuboidId;
+    private CuboidResult baseResult;
+
+    private Queue<CuboidTask> completedTaskQueue;
+    private AtomicInteger taskCuboidCompleted;
+
+    private ICuboidCollectorWithCallBack resultCollector;
+
+    public InMemCubeBuilder2(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
+            Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        super(cuboidScheduler, flatDesc, dictionaryMap);
+        this.measureCount = cubeDesc.getMeasures().size();
+        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+        List<String> metricsAggrFuncsList = Lists.newArrayList();
+
+        for (int i = 0; i < measureCount; i++) {
+            MeasureDesc measureDesc = measureDescs[i];
+            metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
+        }
+        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+        this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+    }
+
+    public int getBaseResultCacheMB() {
+        return baseResult.aggrCacheMB;
+    }
+
+    private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
+        GTInfo info = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, cuboidID),
+                new CubeDimEncMap(cubeDesc, dictionaryMap));
+
+        // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+        // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
+        // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+        IGTStore store = new ConcurrentDiskStore(info);
+
+        GridTable gridTable = new GridTable(info, store);
+        return gridTable;
+    }
+
+    @Override
+    public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+            throws IOException {
+        NavigableMap<Long, CuboidResult> result = buildAndCollect(
+                RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input), null);
+        try {
+            for (CuboidResult cuboidResult : result.values()) {
+                outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+                cuboidResult.table.close();
+            }
+        } finally {
+            output.close();
+        }
+    }
+
+    /**
+     * Build all the cuboids and wait for all the tasks finished. 
+     * 
+     * @param input
+     * @param listener
+     * @return
+     * @throws IOException
+     */
+    private <T> NavigableMap<Long, CuboidResult> buildAndCollect(final RecordConsumeBlockingQueueController<T> input,
+            final ICuboidResultListener listener) throws IOException {
+
+        long startTime = System.currentTimeMillis();
+        logger.info("In Mem Cube Build2 start, " + cubeDesc.getName());
+
+        // build base cuboid
+        buildBaseCuboid(input, listener);
+
+        ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
+            @Override
+            public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+                final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+                worker.setName("inmem-cubing-cuboid-worker-" + worker.getPoolIndex());
+                return worker;
+            }
+        };
+        ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
+        ForkJoinTask rootTask = builderPool.submit(new Runnable() {
+            @Override
+            public void run() {
+                startBuildFromBaseCuboid();
+            }
+        });
+        rootTask.join();
+
+        long endTime = System.currentTimeMillis();
+        logger.info("In Mem Cube Build2 end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
+        logger.info("total CuboidResult count:" + resultCollector.getAllResult().size());
+        return resultCollector.getAllResult();
+    }
+
+    public ICuboidCollectorWithCallBack getResultCollector() {
+        return resultCollector;
+    }
+
+    public <T> CuboidResult buildBaseCuboid(RecordConsumeBlockingQueueController<T> input,
+            final ICuboidResultListener listener) throws IOException {
+        completedTaskQueue = new LinkedBlockingQueue<CuboidTask>();
+        taskCuboidCompleted = new AtomicInteger(0);
+
+        resultCollector = new DefaultCuboidCollectorWithCallBack(listener);
+
+        MemoryBudgetController.MemoryWaterLevel baseCuboidMemTracker = new MemoryWaterLevel();
+        baseCuboidMemTracker.markLow();
+        baseResult = createBaseCuboid(input, baseCuboidMemTracker);
+
+        if (baseResult.nRows == 0) {
+            taskCuboidCompleted.set(cuboidScheduler.getCuboidCount());
+            return baseResult;
+        }
+
+        baseCuboidMemTracker.markLow();
+        baseResult.aggrCacheMB = Math.max(baseCuboidMemTracker.getEstimateMB(), 10); // 10 MB at minimal
+
+        makeMemoryBudget();
+        return baseResult;
+    }
+
+    public CuboidResult buildCuboid(CuboidTask task) throws IOException {
+        CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
+        completedTaskQueue.add(task);
+        addChildTasks(newCuboid);
+        return newCuboid;
+    }
+
+    private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) throws IOException {
+        final String consumerName = "AggrCache@Cuboid " + cuboidId;
+        MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
+            @Override
+            public int freeUp(int mb) {
+                return 0; // cannot free up on demand
+            }
+
+            @Override
+            public String toString() {
+                return consumerName;
+            }
+        };
+
+        // reserve memory for aggregation cache, can't be larger than the parent
+        memBudget.reserveInsist(consumer, parent.aggrCacheMB);
+        try {
+            return aggregateCuboid(parent, cuboidId);
+        } finally {
+            memBudget.reserve(consumer, 0);
+        }
+    }
+
+    public boolean isAllCuboidDone() {
+        return taskCuboidCompleted.get() == cuboidScheduler.getCuboidCount();
+    }
+
+    public void startBuildFromBaseCuboid() {
+        addChildTasks(baseResult);
+    }
+
+    private void addChildTasks(CuboidResult parent) {
+        List<Long> children = cuboidScheduler.getSpanningCuboid(parent.cuboidId);
+        if (children != null && !children.isEmpty()) {
+            List<CuboidTask> childTasks = Lists.newArrayListWithExpectedSize(children.size());
+            for (Long child : children) {
+                CuboidTask task = new CuboidTask(parent, child, this);
+                childTasks.add(task);
+                task.fork();
+            }
+            for (CuboidTask childTask : childTasks) {
+                childTask.join();
+            }
+        }
+    }
+
+    public Queue<CuboidTask> getCompletedTaskQueue() {
+        return completedTaskQueue;
+    }
+
+    private void makeMemoryBudget() {
+        int systemAvailMB = MemoryBudgetController.gcAndGetSystemAvailMB();
+        logger.info("System avail " + systemAvailMB + " MB");
+        int reserve = reserveMemoryMB;
+        logger.info("Reserve " + reserve + " MB for system basics");
+
+        int budget = systemAvailMB - reserve;
+        if (budget < baseResult.aggrCacheMB) {
+            // make sure we have base aggr cache as minimal
+            budget = baseResult.aggrCacheMB;
+            logger.warn("System avail memory (" + systemAvailMB + " MB) is less than base aggr cache ("
+                    + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve
+                    + " MB), consider increase JVM heap -Xmx");
+        }
+
+        logger.info("Memory Budget is " + budget + " MB");
+        memBudget = new MemoryBudgetController(budget);
+    }
+
+    private <T> CuboidResult createBaseCuboid(RecordConsumeBlockingQueueController<T> input,
+            MemoryBudgetController.MemoryWaterLevel baseCuboidMemTracker) throws IOException {
+        logger.info("Calculating base cuboid " + baseCuboidId);
+
+        Stopwatch sw = new Stopwatch();
+        sw.start();
+        GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
+        GTBuilder baseBuilder = baseCuboid.rebuild();
+        IGTScanner baseInput = new InputConverter<>(baseCuboid.getInfo(), input);
+
+        Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils
+                .getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(baseCuboid.getInfo()).setRanges(null).setDimensions(null)
+                .setAggrGroupBy(dimensionMetricsBitSet.getFirst()).setAggrMetrics(dimensionMetricsBitSet.getSecond())
+                .setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
+        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
+        aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
+
+        int count = 0;
+        for (GTRecord r : aggregationScanner) {
+            if (count == 0) {
+                baseCuboidMemTracker.markHigh();
+            }
+            baseBuilder.write(r);
+            count++;
+        }
+        aggregationScanner.close();
+        baseBuilder.close();
+
+        sw.stop();
+        logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + sw.elapsedMillis() + "ms");
+
+        int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache()
+                / MemoryBudgetController.ONE_MB);
+        logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
+
+        return updateCuboidResult(baseCuboidId, baseCuboid, count, sw.elapsedMillis(), 0,
+                input.inputConverterUnit.ifChange());
+    }
+
+    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent,
+            int aggrCacheMB) {
+        return updateCuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB, true);
+    }
+
+    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB,
+            boolean ifCollect) {
+        if (aggrCacheMB <= 0 && baseResult != null) {
+            aggrCacheMB = (int) Math.round(
+                    (DERIVE_AGGR_CACHE_CONSTANT_FACTOR + DERIVE_AGGR_CACHE_VARIABLE_FACTOR * nRows / baseResult.nRows) //
+                            * baseResult.aggrCacheMB);
+        }
+
+        CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
+        taskCuboidCompleted.incrementAndGet();
+
+        if (ifCollect) {
+            resultCollector.collectAndNotify(result);
+        }
+        return result;
+    }
+
+    protected CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException {
+        final Pair<ImmutableBitSet, ImmutableBitSet> allNeededColumns = InMemCubeBuilderUtils
+                .getDimensionAndMetricColumnBitSet(parent.cuboidId, cuboidId, measureCount);
+        return scanAndAggregateGridTable(parent.table, newGridTableByCuboidID(cuboidId), parent.cuboidId,
+                cuboidId, allNeededColumns.getFirst(), allNeededColumns.getSecond());
+    }
+
+    private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId,
+            ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
+        GTInfo info = gridTable.getInfo();
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                .setAggrGroupBy(aggregationColumns).setAggrMetrics(measureColumns).setAggrMetricsFuncs(metricsAggrFuncs)
+                .setFilterPushDown(null).createGTScanRequest();
+        GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
+
+        // for child cuboid, some measures don't need aggregation.
+        if (parentId != cuboidId) {
+            boolean[] aggrMask = new boolean[measureDescs.length];
+            for (int i = 0; i < measureDescs.length; i++) {
+                aggrMask[i] = !measureDescs[i].getFunction().getMeasureType().onlyAggrInBaseCuboid();
+
+                if (!aggrMask[i]) {
+                    logger.info(measureDescs[i].toString() + " doesn't need aggregation.");
+                }
+            }
+            scanner.setAggrMask(aggrMask);
+        }
+
+        return scanner;
+    }
+
+    protected CuboidResult scanAndAggregateGridTable(GridTable gridTable, GridTable newGridTable, long parentId,
+            long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
+        Stopwatch sw = new Stopwatch();
+        sw.start();
+        logger.info("Calculating cuboid " + cuboidId);
+
+        GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns,
+                measureColumns);
+        GTBuilder builder = newGridTable.rebuild();
+
+        ImmutableBitSet allNeededColumns = aggregationColumns.or(measureColumns);
+
+        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
+        int count = 0;
+        try {
+            for (GTRecord record : scanner) {
+                count++;
+                for (int i = 0; i < allNeededColumns.trueBitCount(); i++) {
+                    int c = allNeededColumns.trueBitAt(i);
+                    newRecord.set(i, record.get(c));
+                }
+                builder.write(newRecord);
+            }
+        } finally {
+            scanner.close();
+            builder.close();
+        }
+        sw.stop();
+        logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + sw.elapsedMillis() + "ms");
+
+        return updateCuboidResult(cuboidId, newGridTable, count, sw.elapsedMillis(), 0);
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
index 1beebc7..fc6edd3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
@@ -20,31 +20,21 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
-import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnitForBaseCuboid;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class InMemCuboidFromBaseCuboidMapper
         extends InMemCuboidMapperBase<Text, Text, ByteArrayWritable, ByteArrayWritable, ByteArray> {
@@ -75,16 +65,8 @@ public class InMemCuboidFromBaseCuboidMapper
     }
 
     @Override
-    protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
-            int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
-        AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
-        cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
-        cubeBuilder.setConcurrentThreads(taskThreadCount);
-
-        ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
-                .setNameFormat("inmemory-cube-building-from-base-cuboid-mapper-%d").build());
-        return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit,
-                new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+    protected ICuboidWriter getCuboidWriter(Context context) {
+        return new MapContextGTRecordWriter(context, cubeDesc, cubeSegment);
     }
 
     @Override
@@ -98,5 +80,4 @@ public class InMemCuboidFromBaseCuboidMapper
 
         return new ByteArray(keyValue);
     }
-
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 551a17b..d363afc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -19,24 +19,15 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
-import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class InMemCuboidMapper<KEYIN>
         extends InMemCuboidMapperBase<KEYIN, Object, ByteArrayWritable, ByteArrayWritable, String[]> {
@@ -63,15 +54,7 @@ public class InMemCuboidMapper<KEYIN>
     }
 
     @Override
-    protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
-            int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
-        AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
-        cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
-        cubeBuilder.setConcurrentThreads(taskThreadCount);
-
-        ExecutorService executorService = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build());
-        return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit,
-                new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+    protected ICuboidWriter getCuboidWriter(Context context) {
+        return new MapContextGTRecordWriter(context, cubeDesc, cubeSegment);
     }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
index e95ce8a..ce08b5c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -21,10 +21,13 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Dictionary;
@@ -33,7 +36,10 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
 import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
@@ -74,11 +80,10 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
 
     protected abstract InputConverterUnit<T> getInputConverterUnit(Context context);
 
-    protected abstract Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
-            int reserveMemoryMB, CuboidScheduler cuboidScheduler);
-
     protected abstract T getRecordFromKeyValue(KEYIN key, VALUEIN value);
 
+    protected abstract ICuboidWriter getCuboidWriter(Context context);
+
     @Override
     protected void doSetup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
@@ -106,7 +111,24 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
         taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
         reserveMemoryMB = calculateReserveMB(conf);
         inputConverterUnit = getInputConverterUnit(context);
-        future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler);
+
+        AbstractInMemCubeBuilder cubeBuilder;
+        try {
+            cubeBuilder = (AbstractInMemCubeBuilder) Class.forName(cubeSegment.getConfig().getCubeInMemBuilderClass())
+                    .getConstructor(CuboidScheduler.class, IJoinedFlatTableDesc.class, Map.class)
+                    .newInstance(cuboidScheduler, flatDesc, dictionaryMap);
+        } catch (Exception e) {
+            logger.warn("Fail to initialize cube builder by class name "
+                    + cubeSegment.getConfig().getCubeInMemBuilderClass() + " due to " + e);
+            cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
+        }
+        cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
+        cubeBuilder.setConcurrentThreads(taskThreadCount);
+
+        ExecutorService executorService = Executors.newSingleThreadExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build());
+        future = executorService
+                .submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit, getCuboidWriter(context)));
     }
 
     private int calculateReserveMB(Configuration configuration) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
index 60d0870..43b7ee2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.ICuboidGTTableWriter;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  */
-public abstract class KVGTRecordWriter implements ICuboidWriter {
+public abstract class KVGTRecordWriter extends ICuboidGTTableWriter {
 
     private static final Logger logger = LoggerFactory.getLogger(KVGTRecordWriter.class);
 
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index be3d759..695455b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -32,6 +32,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -101,6 +102,11 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
         }
 
         @Override
+        public void write(long cuboidId, GridTable table) throws IOException {
+
+        }
+
+        @Override
         public void flush() {
 
         }
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 8fcf9ed..6cfec84 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -37,8 +37,11 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.inmemcubing2.DoggedCubeBuilder2;
+import org.apache.kylin.cube.inmemcubing2.InMemCubeBuilder2;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -48,6 +51,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 /**
  */
 public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
@@ -89,10 +94,19 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
         long randSeed = System.currentTimeMillis();
 
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
+        inmemBuilder.setConcurrentThreads(THREADS);
+        FileRecordWriter inmemResult = new FileRecordWriter();
+        {
+            Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+            inmemResult.close();
+        }
+
         DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
         doggedBuilder.setConcurrentThreads(THREADS);
         FileRecordWriter doggedResult = new FileRecordWriter();
-
         {
             Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
             ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed, SPLIT_ROWS);
@@ -100,20 +114,34 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
             doggedResult.close();
         }
 
-        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
-        inmemBuilder.setConcurrentThreads(THREADS);
-        FileRecordWriter inmemResult = new FileRecordWriter();
-
+        InMemCubeBuilder2 inmemBuilder2 = new InMemCubeBuilder2(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
+        inmemBuilder2.setConcurrentThreads(THREADS);
+        FileRecordWriter inmemResult2 = new FileRecordWriter();
         {
-            Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+            Future<?> future = executorService.submit(inmemBuilder2.buildAsRunnable(queue, inmemResult2));
             ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
             future.get();
-            inmemResult.close();
+            inmemResult2.close();
         }
 
+        DoggedCubeBuilder2 doggedBuilder2 = new DoggedCubeBuilder2(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
+        doggedBuilder2.setConcurrentThreads(THREADS);
+        FileRecordWriter doggedResult2 = new FileRecordWriter();
+        {
+            Future<?> future = executorService.submit(doggedBuilder2.buildAsRunnable(queue, doggedResult2));
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed, SPLIT_ROWS);
+            future.get();
+            doggedResult2.close();
+        }
+
+        fileCompare(inmemResult.file, inmemResult2.file);
         fileCompare(inmemResult.file, doggedResult.file);
-        doggedResult.file.delete();
+        fileCompare2(inmemResult.file, doggedResult2.file);
+
         inmemResult.file.delete();
+        inmemResult2.file.delete();
+        doggedResult.file.delete();
+        doggedResult2.file.delete();
     }
 
     private void fileCompare(File file, File file2) throws IOException {
@@ -133,6 +161,27 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
         r2.close();
     }
 
+    private void fileCompare2(File file, File file2) throws IOException {
+        Map<String, Integer> content1 = readContents(file);
+        Map<String, Integer> content2 = readContents(file2);
+        assertEquals(content1, content2);
+    }
+
+    private Map<String, Integer> readContents(File file) throws IOException {
+        BufferedReader r = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+        Map<String, Integer> content = Maps.newHashMap();
+        String line;
+        while ((line = r.readLine()) != null) {
+            Integer cnt = content.get(line);
+            if (cnt == null) {
+                cnt = 0;
+            }
+            content.put(line, cnt + 1);
+        }
+        r.close();
+        return content;
+    }
+
     class FileRecordWriter implements ICuboidWriter {
 
         File file;
@@ -152,6 +201,14 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
         }
 
         @Override
+        public void write(long cuboidId, GridTable table) throws IOException {
+            writer.print(cuboidId);
+            writer.print(", ");
+            writer.print(table.toString());
+            writer.println();
+        }
+
+        @Override
         public void flush() {
 
         }
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 2a96b39..0353313 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -45,6 +45,7 @@ import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -276,6 +277,12 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
         }
 
         @Override
+        public void write(long cuboidId, GridTable table) throws IOException {
+            if (verbose)
+                System.out.println(table.toString());
+        }
+
+        @Override
         public void flush() {
             if (verbose) {
                 System.out.println("flush");