You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/20 14:44:17 UTC

incubator-kylin git commit: KYLIN-803 Enable InMemCubeBuilder build multiple times

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 01b89861e -> ed87c6f37


KYLIN-803 Enable InMemCubeBuilder build multiple times


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ed87c6f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ed87c6f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ed87c6f3

Branch: refs/heads/0.8
Commit: ed87c6f37fccd824ee61dae11529806dba25c1a9
Parents: 01b8986
Author: Yang Li <li...@apache.org>
Authored: Sat Jun 20 20:43:48 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat Jun 20 20:43:48 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/util/ImmutableBitSet.java      |   2 +
 .../job/hadoop/cubev2/InMemCuboidMapper.java    |   4 +-
 .../inmemcubing/AbstractInMemCubeBuilder.java   |  76 ++++++
 .../job/inmemcubing/DoggedCubeBuilder.java      | 237 +++++++++++++++++++
 .../kylin/job/inmemcubing/InMemCubeBuilder.java |  95 ++++----
 .../kylin/job/streaming/CubeStreamConsumer.java |   4 +-
 .../job/inmemcubing/InMemCubeBuilderTest.java   |  29 ++-
 7 files changed, 385 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
index 807a03d..f13cd1d 100644
--- a/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
+++ b/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -39,10 +39,12 @@ public class ImmutableBitSet {
         }
     }
 
+    /** return number of true bits */
     public int trueBitCount() {
         return arr.length;
     }
 
+    /** return the i-th true bit index */
     public int trueBitAt(int i) {
         return arr[i];
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index fdb4823..4f6f295 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -75,9 +75,9 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
             }
         }
 
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment));
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
         ExecutorService executorService = Executors.newSingleThreadExecutor();
-        future = executorService.submit(cubeBuilder);
+        future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
new file mode 100644
index 0000000..034c4cd
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
@@ -0,0 +1,76 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.kylin.job.inmemcubing;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * An interface alike abstract class. Hold common tunable parameters and nothing more.
+ */
+abstract public class AbstractInMemCubeBuilder {
+
+    final protected CubeDesc cubeDesc;
+    final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
+    
+    protected int taskThreadCount = 4;
+    protected boolean outputOrderRequired = false;
+    protected int reserveMemoryMB = 100;
+
+    public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+        if(cubeDesc == null)
+            throw new NullPointerException();
+        if (dictionaryMap == null || dictionaryMap.isEmpty())
+            throw new IllegalArgumentException("dictionary cannot be empty");
+        
+        this.cubeDesc = cubeDesc;
+        this.dictionaryMap = dictionaryMap;
+    }
+    
+    public void setConcurrentThreads(int n) {
+        this.taskThreadCount = n;
+    }
+
+    public void setOutputOrder(boolean required) {
+        this.outputOrderRequired = required;
+    }
+    
+    public void setReserveMemoryMB(int mb) {
+        this.reserveMemoryMB = mb;
+    }
+
+    public Runnable buildAsRunnable(final BlockingQueue<List<String>> input, final ICuboidWriter output) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    build(input, output);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+    }
+    
+    abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter gtRecordWriter) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
new file mode 100644
index 0000000..02f24f7
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -0,0 +1,237 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.kylin.job.inmemcubing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * When base cuboid does not fit in memory, cut the input into multiple splits and merge the split outputs at last.
+ */
+public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
+
+    private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
+
+    public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+        super(cubeDesc, dictionaryMap);
+    }
+
+    @Override
+    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+        new BuildOnce().build(input, output);
+    }
+
+    private class BuildOnce {
+
+        public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+            List<SplitThread> splits = new ArrayList<SplitThread>();
+            Merger merger = new Merger();
+            SplitThread last = null;
+            boolean eof = false;
+
+            while (!eof) {
+
+                if (last != null && shouldCutSplit()) {
+                    cutSplit(last);
+                    last = null;
+                }
+                
+                checkException(splits);
+                
+                if (last == null) {
+                    last = new SplitThread(merger.newMergeSlot());
+                    splits.add(last);
+                    last.start();
+                }
+                
+                eof = feedSomeInput(input, last, 1000);
+            }
+            
+            merger.mergeAndOutput(splits, output);
+            
+            checkException(splits);
+        }
+
+        private void checkException(List<SplitThread> splits) throws IOException {
+            for (int i = 0; i < splits.size(); i++) {
+                SplitThread split = splits.get(i);
+                if (split.exception != null)
+                    abort(splits);
+            }
+        }
+
+        private void abort(List<SplitThread> splits) throws IOException {
+            for (SplitThread split : splits) {
+                split.builder.abort();
+            }
+            
+            ArrayList<Throwable> errors = new ArrayList<Throwable>();
+            for (SplitThread split : splits) {
+                try {
+                    split.join();
+                } catch (InterruptedException e) {
+                    errors.add(e);
+                }
+                if (split.exception != null)
+                    errors.add(split.exception);
+            }
+            
+            if (errors.isEmpty()) {
+                return;
+            } else if (errors.size() == 1) {
+                Throwable t = errors.get(0);
+                if (t instanceof IOException)
+                    throw (IOException) t;
+                else
+                    throw new IOException(t);
+            } else {
+                for (Throwable t : errors)
+                    logger.error("Exception during in-mem cube build", t);
+                throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
+            }
+
+        }
+
+        private boolean feedSomeInput(BlockingQueue<List<String>> input, SplitThread split, int n) {
+            try {
+                int i = 0;
+                while (i < n) {
+                    List<String> record = input.take();
+                    i++;
+                    
+                    while (split.inputQueue.offer(record, 1, TimeUnit.SECONDS) == false) {
+                        if (split.exception != null)
+                            return true; // got some error
+                    }
+                    
+                    if (record == null || record.isEmpty()) {
+                        return true;
+                    }
+                }
+                return false;
+                
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void cutSplit(SplitThread last) {
+            try {
+                // signal the end of input
+                while (last.isAlive()) {
+                    if (last.inputQueue.offer(Collections.<String> emptyList())) {
+                        break;
+                    }
+                    Thread.sleep(1000);
+                }
+                
+                // wait cuboid build done (but still pending output)
+                while (last.isAlive()) {
+                    if (last.builder.isAllCuboidDone()) {
+                        break;
+                    }
+                    Thread.sleep(1000);
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private boolean shouldCutSplit() {
+            return MemoryBudgetController.getSystemAvailMB() <= reserveMemoryMB;
+        }
+    }
+
+    private class SplitThread extends Thread {
+        final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(64);
+        final BlockingQueue<Pair<Long, GTRecord>> outputQueue = new ArrayBlockingQueue<Pair<Long, GTRecord>>(64);
+        final InMemCubeBuilder builder;
+        final MergeSlot output;
+
+        RuntimeException exception;
+
+        public SplitThread(MergeSlot output) {
+            this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
+            this.builder.setConcurrentThreads(taskThreadCount);
+            this.builder.setOutputOrder(true); // sort merge requires order
+            this.builder.setReserveMemoryMB(reserveMemoryMB);
+            
+            this.output = output;
+        }
+
+        @Override
+        public void run() {
+            try {
+                builder.build(inputQueue, output);
+            } catch (Exception e) {
+                if (e instanceof RuntimeException)
+                    this.exception = (RuntimeException) e;
+                else
+                    this.exception = new RuntimeException(e);
+            }
+        }
+    }
+    
+    private class Merger {
+        
+        public MergeSlot newMergeSlot() {
+            return new MergeSlot();
+        }
+
+        public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) {
+            // TODO
+        }
+    }
+    
+    private static class MergeSlot implements ICuboidWriter {
+        
+        BlockingQueue<MergeSlot> queue = new ArrayBlockingQueue<MergeSlot>(1);
+        long cuboidId;
+        GTRecord record;
+        
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+            this.cuboidId = cuboidId;
+            this.record = record;
+        
+            try {
+                // deliver the record
+                queue.put(this);
+                
+                // confirm merger consumed (took) the record
+                queue.put(this);
+                
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index b9c48aa..d9a0b8f 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -18,6 +18,7 @@ package org.apache.kylin.job.inmemcubing;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
@@ -48,19 +49,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
+ * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
+ * Assumes base cuboid fits in memory or otherwise OOM exception will occur.
  */
-public class InMemCubeBuilder implements Runnable {
+public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
     private static final LongWritable ONE = new LongWritable(1l);
 
-    private final BlockingQueue<List<String>> inputQueue;
-    private final ICuboidWriter outputWriter;
-
-    private final CubeDesc cubeDesc;
     private final long baseCuboidId;
     private final CuboidScheduler cuboidScheduler;
-    private final Map<TblColRef, Dictionary<?>> dictionaryMap;
     private final CubeJoinedFlatTableDesc intermediateTableDesc;
     private final MeasureCodec measureCodec;
     private final String[] metricsAggrFuncs;
@@ -70,7 +68,6 @@ public class InMemCubeBuilder implements Runnable {
     private final int measureCount;
 
     private MemoryBudgetController memBudget;
-    private int taskThreadCount = 4;
     private Thread[] taskThreads;
     private Throwable[] taskThreadExceptions;
     private TreeSet<CuboidTask> taskPending;
@@ -78,19 +75,12 @@ public class InMemCubeBuilder implements Runnable {
 
     private OutputThread outputThread;
     private int outputCuboidExpected;
-    private boolean outputOrderRequired;
     private CuboidResult baseResult;
     private Object[] totalSumForSanityCheck;
 
-    public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap, ICuboidWriter gtRecordWriter) {
-        if (dictionaryMap == null || dictionaryMap.isEmpty()) {
-            throw new IllegalArgumentException("dictionary cannot be empty");
-        }
-        this.inputQueue = queue;
-        this.cubeDesc = cubeDesc;
+    public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+        super(cubeDesc, dictionaryMap);
         this.cuboidScheduler = new CuboidScheduler(cubeDesc);
-        this.dictionaryMap = dictionaryMap;
-        this.outputWriter = gtRecordWriter;
         this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
         this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
@@ -136,18 +126,10 @@ public class InMemCubeBuilder implements Runnable {
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
     }
 
-    public void setConcurrentThreads(int n) {
-        this.taskThreadCount = n;
-    }
-
-    public void setOutputOrder(boolean required) {
-        this.outputOrderRequired = required;
-    }
-
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
         GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
 
-        // Before several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+        // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
         // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
         // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
         ConcurrentDiskStore store = new ConcurrentDiskStore(info);
@@ -166,17 +148,7 @@ public class InMemCubeBuilder implements Runnable {
     }
 
     @Override
-    public void run() {
-        try {
-            build();
-        } catch (IOException e) {
-            logger.error("Fail to build cube", e);
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    public void build() throws IOException {
+    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
         long startTime = System.currentTimeMillis();
         logger.info("In Mem Cube Build start, " + cubeDesc.getName());
 
@@ -187,11 +159,12 @@ public class InMemCubeBuilder implements Runnable {
         taskThreadExceptions = new Throwable[taskThreadCount];
 
         // output goes in a separate thread to leverage any async-ness
-        outputThread = new OutputThread();
+        outputThread = new OutputThread(output);
         outputCuboidExpected = outputThread.getOutputCuboidExpected();
 
         // build base cuboid
-        baseResult = createBaseCuboid();
+        totalSumForSanityCheck = null;
+        baseResult = createBaseCuboid(input);
         taskCuboidCompleted.incrementAndGet();
         if (baseResult.nRows == 0)
             return;
@@ -213,11 +186,21 @@ public class InMemCubeBuilder implements Runnable {
 
         throwExceptionIfAny();
     }
+    
+    public void abort() {
+        interrupt(taskThreads);
+        interrupt(outputThread);
+    }
 
     private void start(Thread... threads) {
         for (Thread t : threads)
             t.start();
     }
+    
+    private void interrupt(Thread... threads) {
+        for (Thread t : threads)
+            t.interrupt();
+    }
 
     private void join(Thread... threads) throws IOException {
         try {
@@ -261,6 +244,10 @@ public class InMemCubeBuilder implements Runnable {
         return result;
     }
 
+    public boolean isAllCuboidDone() {
+        return taskCuboidCompleted.get() == outputCuboidExpected;
+    }
+    
     private class CuboidTaskThread extends Thread {
         private int id;
 
@@ -272,7 +259,7 @@ public class InMemCubeBuilder implements Runnable {
         @Override
         public void run() {
             try {
-                while (taskCuboidCompleted.get() < outputCuboidExpected) {
+                while (!isAllCuboidDone()) {
                     CuboidTask task = null;
                     synchronized (taskPending) {
                         while (task == null && taskHasNoException()) {
@@ -291,7 +278,7 @@ public class InMemCubeBuilder implements Runnable {
                     task.parent.markOneSpanningDone();
                     taskCuboidCompleted.incrementAndGet();
 
-                    if (taskCuboidCompleted.get() == outputCuboidExpected) {
+                    if (isAllCuboidDone()) {
                         for (Thread t : taskThreads) {
                             if (t != Thread.currentThread())
                                 t.interrupt();
@@ -299,7 +286,7 @@ public class InMemCubeBuilder implements Runnable {
                     }
                 }
             } catch (Throwable ex) {
-                if (taskCuboidCompleted.get() < outputCuboidExpected) {
+                if (!isAllCuboidDone()) {
                     logger.error("task thread exception", ex);
                     taskThreadExceptions[id] = ex;
                 }
@@ -339,7 +326,7 @@ public class InMemCubeBuilder implements Runnable {
     private void makeMemoryBudget() {
         int systemAvailMB = getSystemAvailMB();
         logger.info("System avail " + systemAvailMB + " MB");
-        int reserve = Math.max(100, baseResult.aggrCacheMB / 3);
+        int reserve = Math.max(reserveMemoryMB, baseResult.aggrCacheMB / 3);
         logger.info("Reserve " + reserve + " MB for system basics");
 
         int budget = systemAvailMB - reserve;
@@ -350,15 +337,13 @@ public class InMemCubeBuilder implements Runnable {
         }
 
         logger.info("Memory Budget is " + budget + " MB");
-        if (budget > 0) {
-            memBudget = new MemoryBudgetController(budget);
-        }
+        memBudget = new MemoryBudgetController(budget);
     }
 
-    private CuboidResult createBaseCuboid() throws IOException {
+    private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
         GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
         GTBuilder baseBuilder = baseCuboid.rebuild();
-        IGTScanner baseInput = new InputConverter(baseCuboid.getInfo());
+        IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
 
         int mbBefore = getSystemAvailMB();
         int mbAfter = 0;
@@ -384,7 +369,7 @@ public class InMemCubeBuilder implements Runnable {
         long timeSpent = System.currentTimeMillis() - startTime;
         logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
 
-        int mbBaseAggrCacheOnHeap = mbBefore - mbAfter;
+        int mbBaseAggrCacheOnHeap = mbAfter == 0 ? 0 : mbBefore - mbAfter;
         int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
         int mbBaseAggrCache = Math.max((int) (mbBaseAggrCacheOnHeap * 1.1), mbEstimateBaseAggrCache);
         mbBaseAggrCache = Math.max(mbBaseAggrCache, 10); // let it be 10 MB at least
@@ -503,6 +488,7 @@ public class InMemCubeBuilder implements Runnable {
                 builder.write(newRecord);
             }
 
+            // disable sanity check for performance
             sanityCheck(scanner.getTotalSumForSanityCheck());
         } finally {
             scanner.close();
@@ -515,6 +501,7 @@ public class InMemCubeBuilder implements Runnable {
         return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
     }
 
+    //@SuppressWarnings("unused")
     private void sanityCheck(Object[] totalSum) {
         // double sum introduces error and causes result not exactly equal
         for (int i = 0; i < totalSum.length; i++) {
@@ -599,14 +586,16 @@ public class InMemCubeBuilder implements Runnable {
     // ============================================================================
 
     private class OutputThread extends Thread {
+        private ICuboidWriter output;
         private SortedMap<Long, Long> outputSequence; // synchronized sorted map
         private LinkedBlockingDeque<CuboidResult> outputPending;
         private int outputCount;
         private int outputCuboidExpected;
         private Throwable outputThreadException;
 
-        OutputThread() {
+        OutputThread(ICuboidWriter output) {
             super("CuboidOutput");
+            this.output = output;
             this.outputSequence = prepareOutputSequence();
             this.outputPending = new LinkedBlockingDeque<CuboidResult>();
             this.outputCount = 0;
@@ -696,7 +685,7 @@ public class InMemCubeBuilder implements Runnable {
             GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
             IGTScanner scanner = gridTable.scan(req);
             for (GTRecord record : scanner) {
-                outputWriter.write(cuboidId, record);
+                output.write(cuboidId, record);
             }
             scanner.close();
             logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
@@ -720,9 +709,11 @@ public class InMemCubeBuilder implements Runnable {
     private class InputConverter implements IGTScanner {
         GTInfo info;
         GTRecord record;
+        BlockingQueue<List<String>> input;
 
-        public InputConverter(GTInfo info) {
+        public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
             this.info = info;
+            this.input = input;
             this.record = new GTRecord(info);
         }
 
@@ -735,7 +726,7 @@ public class InMemCubeBuilder implements Runnable {
                 @Override
                 public boolean hasNext() {
                     try {
-                        currentObject = inputQueue.take();
+                        currentObject = input.take();
                     } catch (InterruptedException e) {
                         throw new RuntimeException(e);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 98435cb..0508a4e 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -111,9 +111,9 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
         final HTableInterface hTable = createHTable(cubeSegment);
 
         final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
-        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(), dictionaryMap, gtRecordWriter);
+        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
 
-        executorService.submit(inMemCubeBuilder).get();
+        executorService.submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, gtRecordWriter)).get();
         gtRecordWriter.flush();
         commitSegment(cubeSegment);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ed87c6f3/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index 63e303d..34e37f2 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -77,22 +77,39 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
     public void test() throws Exception {
         final int inputRows = 70000;
         final int threads = 4;
-        
+
         final CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
         final String flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
 
         Map<TblColRef, Dictionary<?>> dictionaryMap = getDictionaryMap(cube, flatTable);
         ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
 
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
         cubeBuilder.setConcurrentThreads(threads);
         ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cubeBuilder);
-
-        feedData(cube, flatTable, queue, inputRows);
 
         try {
-            future.get();
+            // round 1
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, inputRows);
+                future.get();
+            }
+            
+            // round 2, zero input
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, 0);
+                future.get();
+            }
+            
+            // round 3
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, inputRows);
+                future.get();
+            }
+            
         } catch (Exception e) {
             logger.error("stream build failed", e);
             throw new IOException("Failed to build cube ", e);