You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/25 01:43:41 UTC

incubator-kylin git commit: KYLIN-803 DoggedCubeBuilder that cut input into splits and do in-mem build one by one

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 fe569ac4f -> e124e8f33


KYLIN-803 DoggedCubeBuilder that cut input into splits and do in-mem build one by one


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e124e8f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e124e8f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e124e8f3

Branch: refs/heads/0.8
Commit: e124e8f335ef1da22826eb49b70c89d444e3e6de
Parents: fe569ac
Author: Yang Li <li...@apache.org>
Authored: Wed Jun 24 10:39:23 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Thu Jun 25 07:40:10 2015 +0800

----------------------------------------------------------------------
 .../kylin/cube/cuboid/CuboidScheduler.java      |  14 +-
 .../inmemcubing/AbstractInMemCubeBuilder.java   |  28 ++-
 .../job/inmemcubing/DoggedCubeBuilder.java      | 238 +++++++++---------
 .../kylin/job/inmemcubing/InMemCubeBuilder.java | 243 ++++---------------
 .../DoggedCubeBuilderStressTest.java            |  95 ++++++++
 .../job/inmemcubing/DoggedCubeBuilderTest.java  |   4 +-
 .../job/inmemcubing/InMemCubeBuilderTest.java   |   2 +-
 .../kylin/storage/gridtable/GridTable.java      |  10 +-
 8 files changed, 313 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 07be092..bebfd08 100644
--- a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -19,8 +19,6 @@
 package org.apache.kylin.cube.cuboid;
 
 /** 
- * @author George Song (ysong1)
- * 
  */
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +40,18 @@ public class CuboidScheduler {
         this.max = (long) Math.pow(2, size) - 1;
         this.cache = new ConcurrentHashMap<Long, List<Long>>();
     }
+    
+    public int getCuboidCount() {
+        return getCuboidCount(Cuboid.getBaseCuboidId(cubeDef));
+    }
+
+    private int getCuboidCount(long cuboidId) {
+        int r = 1;
+        for (Long child : getSpanningCuboid(cuboidId)) {
+            r += getCuboidCount(child);
+        }
+        return r;
+    }
 
     public List<Long> getSpanningCuboid(long cuboid) {
         if (cuboid > max || cuboid < 0) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
index 034c4cd..0ff7767 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
@@ -24,17 +24,24 @@ import java.util.concurrent.BlockingQueue;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An interface alike abstract class. Hold common tunable parameters and nothing more.
  */
 abstract public class AbstractInMemCubeBuilder {
 
+    private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
+
     final protected CubeDesc cubeDesc;
     final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
     
     protected int taskThreadCount = 4;
-    protected boolean outputOrderRequired = false;
     protected int reserveMemoryMB = 100;
 
     public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
@@ -51,10 +58,6 @@ abstract public class AbstractInMemCubeBuilder {
         this.taskThreadCount = n;
     }
 
-    public void setOutputOrder(boolean required) {
-        this.outputOrderRequired = required;
-    }
-    
     public void setReserveMemoryMB(int mb) {
         this.reserveMemoryMB = mb;
     }
@@ -72,5 +75,18 @@ abstract public class AbstractInMemCubeBuilder {
         };
     }
     
-    abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter gtRecordWriter) throws IOException;
+    abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException;
+
+    protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
+        long startTime = System.currentTimeMillis();
+        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+        IGTScanner scanner = gridTable.scan(req);
+        for (GTRecord record : scanner) {
+            output.write(cuboidId, record);
+        }
+        scanner.close();
+        logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+    }
+    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
index dc7c695..b0c3d5c 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -19,10 +19,12 @@ package org.apache.kylin.job.inmemcubing;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -31,11 +33,14 @@ import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder.CuboidResult;
 import org.apache.kylin.metadata.measure.MeasureAggregators;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
 import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTScanner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,34 +72,63 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
 
     private class BuildOnce {
 
-        final List<SplitThread> splits = new ArrayList<SplitThread>();
-        final Merger merger = new Merger();
-
         public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
-            SplitThread last = null;
-            boolean eof = false;
+            final List<SplitThread> splits = new ArrayList<SplitThread>();
+            final Merger merger = new Merger();
+
+            long start = System.currentTimeMillis();
+            logger.info("Dogged Cube Build start");
+
+            try {
+                SplitThread last = null;
+                boolean eof = false;
 
-            while (!eof) {
+                while (!eof) {
 
-                if (last != null && shouldCutSplit()) {
-                    cutSplit(last);
-                    last = null;
+                    if (last != null && shouldCutSplit(splits)) {
+                        cutSplit(last);
+                        last = null;
+                    }
+
+                    checkException(splits);
+
+                    if (last == null) {
+                        last = new SplitThread();
+                        splits.add(last);
+                        last.start();
+                    }
+
+                    eof = feedSomeInput(input, last, unitRows);
                 }
 
+                for (SplitThread split : splits) {
+                    split.join();
+                }
                 checkException(splits);
+                logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");
 
-                if (last == null) {
-                    last = new SplitThread(merger);
-                    splits.add(last);
-                    last.start();
-                }
+                merger.mergeAndOutput(splits, output);
 
-                eof = feedSomeInput(input, last, unitRows);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            } finally {
+                closeGirdTables(splits);
+                logger.info("Dogged Cube Build end, totally took " + (System.currentTimeMillis() - start) + " ms");
             }
+        }
 
-            merger.mergeAndOutput(splits, output);
-
-            checkException(splits);
+        private void closeGirdTables(List<SplitThread> splits) {
+            for (SplitThread split : splits) {
+                if (split.buildResult != null) {
+                    for (CuboidResult r : split.buildResult.values()) {
+                        try {
+                            r.table.close();
+                        } catch (Throwable e) {
+                            logger.error("Error closing grid table " + r.table, e);
+                        }
+                    }
+                }
+            }
         }
 
         private void checkException(List<SplitThread> splits) throws IOException {
@@ -171,7 +205,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                     Thread.sleep(1000);
                 }
 
-                // wait cuboid build done (but still pending output)
+                // wait cuboid build done
                 while (last.isAlive()) {
                     if (last.builder.isAllCuboidDone()) {
                         break;
@@ -183,7 +217,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
             }
         }
 
-        private boolean shouldCutSplit() {
+        private boolean shouldCutSplit(List<SplitThread> splits) {
             int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
             int nSplit = splits.size();
             long splitRowCount = nSplit == 0 ? 0 : splits.get(nSplit - 1).inputRowCount;
@@ -197,24 +231,21 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
     private class SplitThread extends Thread {
         final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(16);
         final InMemCubeBuilder builder;
-        final MergeSlot output;
 
+        TreeMap<Long, CuboidResult> buildResult;
         long inputRowCount = 0;
         RuntimeException exception;
 
-        public SplitThread(Merger merger) {
+        public SplitThread() {
             this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
             this.builder.setConcurrentThreads(taskThreadCount);
-            this.builder.setOutputOrder(true); // merge sort requires order
             this.builder.setReserveMemoryMB(reserveMemoryMB);
-
-            this.output = merger.newMergeSlot(this);
         }
 
         @Override
         public void run() {
             try {
-                builder.build(inputQueue, output);
+                buildResult = builder.build(inputQueue);
             } catch (Exception e) {
                 if (e instanceof RuntimeException)
                     this.exception = (RuntimeException) e;
@@ -239,74 +270,55 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
             reuseMetricsArray = new Object[measures.length];
         }
 
-        public MergeSlot newMergeSlot(SplitThread split) {
-            return new MergeSlot(split);
-        }
-
         public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {
-            LinkedList<MergeSlot> open = Lists.newLinkedList();
-            for (SplitThread split : splits)
-                open.add(split.output);
-
             if (splits.size() == 1) {
-                splits.get(0).output.directOutput = output;
-            }
-
-            try {
-                PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
-                boolean hasMore = true;
-
-                while (hasMore) {
-                    takeRecordsFromAllOpenSlots(open, heap);
-                    hasMore = mergeAndOutputOneRecord(heap, open, output);
+                for (CuboidResult cuboidResult : splits.get(0).buildResult.values()) {
+                    outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+                    cuboidResult.table.close();
                 }
-
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                return;
             }
-        }
 
-        private void takeRecordsFromAllOpenSlots(LinkedList<MergeSlot> open, PriorityQueue<MergeSlot> heap) throws InterruptedException {
-            while (!open.isEmpty()) {
-                MergeSlot slot = open.getFirst();
-                // ready one record in the slot
-                if (slot.readySignal.poll(1, TimeUnit.SECONDS) != null) {
-                    open.removeFirst();
-                    heap.add(slot);
-                } else if (slot.isClosed()) {
-                    open.removeFirst();
-                }
+            LinkedList<MergeSlot> open = Lists.newLinkedList();
+            for (SplitThread split : splits) {
+                open.add(new MergeSlot(split));
             }
-            return;
-        }
 
-        private boolean mergeAndOutputOneRecord(PriorityQueue<MergeSlot> heap, LinkedList<MergeSlot> open, ICuboidWriter output) throws IOException, InterruptedException {
-            MergeSlot smallest = heap.poll();
-            if (smallest == null)
-                return false;
-            open.add(smallest);
-
-            if (smallest.isSameKey(heap.peek())) {
-                Object[] metrics = getMetricsValues(smallest.record);
-                reuseAggrs.reset();
-                reuseAggrs.aggregate(metrics);
-                do {
-                    MergeSlot slot = heap.poll();
-                    open.add(slot);
-                    metrics = getMetricsValues(slot.record);
-                    reuseAggrs.aggregate(metrics);
-                } while (smallest.isSameKey(heap.peek()));
+            PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
 
-                reuseAggrs.collectStates(metrics);
-                setMetricsValues(smallest.record, metrics);
-            }
+            while (true) {
+                // ready records in open slots and add to heap
+                while (!open.isEmpty()) {
+                    MergeSlot slot = open.removeFirst();
+                    if (slot.fetchNext()) {
+                        heap.add(slot);
+                    }
+                }
+
+                // find the smallest on heap
+                MergeSlot smallest = heap.poll();
+                if (smallest == null)
+                    break;
+                open.add(smallest);
 
-            output.write(smallest.cuboidId, smallest.record);
+                // merge with slots having the same key
+                if (smallest.isSameKey(heap.peek())) {
+                    Object[] metrics = getMetricsValues(smallest.currentRecord);
+                    reuseAggrs.reset();
+                    reuseAggrs.aggregate(metrics);
+                    do {
+                        MergeSlot slot = heap.poll();
+                        open.add(slot);
+                        metrics = getMetricsValues(slot.currentRecord);
+                        reuseAggrs.aggregate(metrics);
+                    } while (smallest.isSameKey(heap.peek()));
+
+                    reuseAggrs.collectStates(metrics);
+                    setMetricsValues(smallest.currentRecord, metrics);
+                }
 
-            for (MergeSlot slot : open) {
-                slot.consumedSignal.put(this);
+                output.write(smallest.currentCuboidId, smallest.currentRecord);
             }
-            return true;
         }
 
         private void setMetricsValues(GTRecord record, Object[] metricsValues) {
@@ -337,58 +349,52 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
         }
     }
 
-    private static class MergeSlot implements ICuboidWriter, Comparable<MergeSlot> {
+    private static class MergeSlot implements Comparable<MergeSlot> {
 
-        final SplitThread split;
-        final BlockingQueue<Object> readySignal = new ArrayBlockingQueue<Object>(1);
-        final BlockingQueue<Object> consumedSignal = new ArrayBlockingQueue<Object>(1);
+        final Iterator<CuboidResult> cuboidIterator;
+        IGTScanner scanner;
+        Iterator<GTRecord> recordIterator;
 
-        ICuboidWriter directOutput = null;
-        long cuboidId;
-        GTRecord record;
+        long currentCuboidId;
+        GTRecord currentRecord;
 
         public MergeSlot(SplitThread split) {
-            this.split = split;
+            cuboidIterator = split.buildResult.values().iterator();
         }
 
-        @Override
-        public void write(long cuboidId, GTRecord record) throws IOException {
-            // when only one split left
-            if (directOutput != null) {
-                directOutput.write(cuboidId, record);
-                return;
+        public boolean fetchNext() throws IOException {
+            if (recordIterator == null) {
+                if (cuboidIterator.hasNext()) {
+                    CuboidResult cuboid = cuboidIterator.next();
+                    currentCuboidId = cuboid.cuboidId;
+                    scanner = cuboid.table.scan(new GTScanRequest(cuboid.table.getInfo(), null, null, null));
+                    recordIterator = scanner.iterator();
+                } else {
+                    return false;
+                }
             }
 
-            this.cuboidId = cuboidId;
-            this.record = record;
-
-            try {
-                // signal record is ready
-                readySignal.put(this);
-
-                // wait record be consumed
-                consumedSignal.take();
-
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+            if (recordIterator.hasNext()) {
+                currentRecord = recordIterator.next();
+                return true;
+            } else {
+                scanner.close();
+                recordIterator = null;
+                return fetchNext();
             }
         }
 
-        public boolean isClosed() {
-            return split.isAlive() == false;
-        }
-
         @Override
         public int compareTo(MergeSlot o) {
-            long cuboidComp = this.cuboidId - o.cuboidId;
+            long cuboidComp = this.currentCuboidId - o.currentCuboidId;
             if (cuboidComp != 0)
                 return cuboidComp < 0 ? -1 : 1;
 
             // note GTRecord.equals() don't work because the two GTRecord comes from different GridTable
-            ImmutableBitSet pk = this.record.getInfo().getPrimaryKey();
+            ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey();
             for (int i = 0; i < pk.trueBitCount(); i++) {
                 int c = pk.trueBitAt(i);
-                int comp = this.record.get(c).compareTo(o.record.get(c));
+                int comp = this.currentRecord.get(c).compareTo(o.currentRecord.get(c));
                 if (comp != 0)
                     return comp;
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 8d0b0fb..3c3d834 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,21 +16,16 @@
  */
 package org.apache.kylin.job.inmemcubing;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.io.DoubleWritable;
@@ -57,7 +52,6 @@ import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.gridtable.GTScanRequest;
 import org.apache.kylin.storage.gridtable.GridTable;
 import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.IGTStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,8 +67,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
     private static final LongWritable ONE = new LongWritable(1l);
 
-    private final long baseCuboidId;
     private final CuboidScheduler cuboidScheduler;
+    private final long baseCuboidId;
+    private final int totalCuboidCount;
     private final CubeJoinedFlatTableDesc intermediateTableDesc;
     private final MeasureCodec measureCodec;
     private final String[] metricsAggrFuncs;
@@ -88,15 +83,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private TreeSet<CuboidTask> taskPending;
     private AtomicInteger taskCuboidCompleted = new AtomicInteger(0);
 
-    private OutputThread outputThread;
-    private int outputCuboidExpected;
     private CuboidResult baseResult;
     private Object[] totalSumForSanityCheck;
+    private ICuboidCollector resultCollector;
 
     public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
         super(cubeDesc, dictionaryMap);
         this.cuboidScheduler = new CuboidScheduler(cubeDesc);
         this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        this.totalCuboidCount = cuboidScheduler.getCuboidCount();
         this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
         this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
 
@@ -154,6 +149,46 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     @Override
     public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+        TreeMap<Long, CuboidResult> result = build(input);
+        for (CuboidResult cuboidResult : result.values()) {
+            outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+            cuboidResult.table.close();
+        }
+    }
+    
+    TreeMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
+        final TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
+        ICuboidCollector collector = new ICuboidCollector() {
+            @Override
+            public void collect(CuboidResult cuboidResult) {
+                result.put(cuboidResult.cuboidId, cuboidResult);
+            }
+        };
+        build(input, collector);
+        return result;
+    }
+    
+    static interface ICuboidCollector {
+        public void collect(CuboidResult result);
+    }
+    
+    static class CuboidResult {
+        public long cuboidId;
+        public GridTable table;
+        public int nRows;
+        public long timeSpent;
+        public int aggrCacheMB;
+
+        public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+            this.cuboidId = cuboidId;
+            this.table = table;
+            this.nRows = nRows;
+            this.timeSpent = timeSpent;
+            this.aggrCacheMB = aggrCacheMB;
+        }
+    }
+
+    private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
         long startTime = System.currentTimeMillis();
         logger.info("In Mem Cube Build start, " + cubeDesc.getName());
 
@@ -163,14 +198,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         taskThreads = prepareTaskThreads();
         taskThreadExceptions = new Throwable[taskThreadCount];
 
-        // output goes in a separate thread to leverage any async-ness
-        outputThread = new OutputThread(output);
-        outputCuboidExpected = outputThread.getOutputCuboidExpected();
-
         // build base cuboid
+        resultCollector = collector;
         totalSumForSanityCheck = null;
         baseResult = createBaseCuboid(input);
-        taskCuboidCompleted.incrementAndGet();
         if (baseResult.nRows == 0)
             return;
 
@@ -180,11 +211,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         // kick off N-D cuboid tasks and output
         addChildTasks(baseResult);
         start(taskThreads);
-        start(outputThread);
 
         // wait complete
         join(taskThreads);
-        join(outputThread);
 
         long endTime = System.currentTimeMillis();
         logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
@@ -194,7 +223,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     
     public void abort() {
         interrupt(taskThreads);
-        interrupt(outputThread);
     }
 
     private void start(Thread... threads) {
@@ -223,9 +251,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             if (t != null)
                 errors.add(t);
         }
-        if (outputThread.getException() != null) {
-            errors.add(outputThread.getException());
-        }
         if (errors.isEmpty()) {
             return;
         } else if (errors.size() == 1) {
@@ -250,7 +275,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     public boolean isAllCuboidDone() {
-        return taskCuboidCompleted.get() == outputCuboidExpected;
+        return taskCuboidCompleted.get() == totalCuboidCount;
     }
     
     private class CuboidTaskThread extends Thread {
@@ -280,8 +305,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
                     CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
                     addChildTasks(newCuboid);
-                    task.parent.markOneSpanningDone();
-                    taskCuboidCompleted.incrementAndGet();
 
                     if (isAllCuboidDone()) {
                         for (Thread t : taskThreads) {
@@ -387,8 +410,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         if (aggrCacheMB <= 0) {
             aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
         }
+        
         CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
-        outputThread.addOutput(result);
+        taskCuboidCompleted.incrementAndGet();
+        
+        resultCollector.collect(result);
         return result;
     }
 
@@ -397,7 +423,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
             @Override
             public int freeUp(int mb) {
-                return 0; // cannot free up
+                return 0; // cannot free up on demand
             }
 
             @Override
@@ -493,17 +519,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         }
     }
 
-    private void closeStore(GridTable gt) {
-        IGTStore store = gt.getStore();
-        if (store instanceof Closeable) {
-            try {
-                ((Closeable) store).close();
-            } catch (IOException e) {
-                logger.warn("Close " + store + " exception", e);
-            }
-        }
-    }
-
     // ===========================================================================
 
     private static class CuboidTask implements Comparable<CuboidTask> {
@@ -522,162 +537,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         }
     }
 
-    private class CuboidResult {
-        long cuboidId;
-        GridTable table;
-        int nRows;
-        @SuppressWarnings("unused")
-        long timeSpent;
-        int aggrCacheMB;
-        boolean outputDone;
-        int spanningDone;
-
-        public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
-            this.cuboidId = cuboidId;
-            this.table = table;
-            this.nRows = nRows;
-            this.timeSpent = timeSpent;
-            this.aggrCacheMB = aggrCacheMB;
-        }
-
-        synchronized void markOutputDone() {
-            outputDone = true;
-            closeIfAllDone();
-        }
-
-        synchronized void markOneSpanningDone() {
-            spanningDone++;
-            closeIfAllDone();
-        }
-
-        private void closeIfAllDone() {
-            if (outputDone && spanningDone == cuboidScheduler.getSpanningCuboid(cuboidId).size()) {
-                closeStore(table);
-            }
-        }
-    }
-
-    // ============================================================================
-
-    private class OutputThread extends Thread {
-        private ICuboidWriter output;
-        private SortedMap<Long, Long> outputSequence; // synchronized sorted map
-        private LinkedBlockingDeque<CuboidResult> outputPending;
-        private int outputCount;
-        private int outputCuboidExpected;
-        private Throwable outputThreadException;
-
-        OutputThread(ICuboidWriter output) {
-            super("CuboidOutput");
-            this.output = output;
-            this.outputSequence = prepareOutputSequence();
-            this.outputPending = new LinkedBlockingDeque<CuboidResult>();
-            this.outputCount = 0;
-            this.outputCuboidExpected = outputSequence.size();
-
-            if (outputOrderRequired == false)
-                outputSequence = null;
-        }
-
-        public int getOutputCuboidExpected() {
-            return outputCuboidExpected;
-        }
-
-        private SortedMap<Long, Long> prepareOutputSequence() {
-            TreeMap<Long, Long> result = new TreeMap<Long, Long>();
-            prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
-            return Collections.synchronizedSortedMap(result);
-        }
-
-        private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, Long> result) {
-            result.put(cuboidId, cuboidId);
-            for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) {
-                prepareOutputPendingRecursive(child, result);
-            }
-        }
-
-        public void addOutput(CuboidResult result) {
-            // if output is NOT ordered
-            if (outputSequence == null) {
-                outputPending.addLast(result);
-            }
-            // if output is ordered
-            else {
-                Long cuboidId = outputSequence.get(result.cuboidId);
-                synchronized (cuboidId) {
-                    outputPending.addFirst(result);
-                    cuboidId.notify();
-                }
-            }
-        }
-
-        private CuboidResult nextOutput() throws InterruptedException {
-            CuboidResult result = null;
-
-            // if output is NOT ordered
-            if (outputSequence == null) {
-                while (result == null && taskHasNoException()) {
-                    result = outputPending.pollFirst(60, TimeUnit.SECONDS);
-                }
-            }
-            // if output is ordered
-            else {
-                Long nextCuboidId = outputSequence.get(outputSequence.firstKey());
-                synchronized (nextCuboidId) {
-                    while ((result = findPendingOutput(nextCuboidId)) == null && taskHasNoException()) {
-                        nextCuboidId.wait(60000);
-                    }
-                }
-                outputSequence.remove(result.cuboidId);
-            }
-
-            return result;
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (outputCount < outputCuboidExpected) {
-                    CuboidResult result = nextOutput();
-
-                    // if task error occurs
-                    if (result == null || result.table == null)
-                        break;
-
-                    outputCuboid(result.cuboidId, result.table);
-                    outputCount++;
-                    result.markOutputDone();
-                }
-            } catch (Throwable ex) {
-                logger.error("output thread exception", ex);
-                outputThreadException = ex;
-            }
-        }
-
-        private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
-            long startTime = System.currentTimeMillis();
-            GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
-            IGTScanner scanner = gridTable.scan(req);
-            for (GTRecord record : scanner) {
-                output.write(cuboidId, record);
-            }
-            scanner.close();
-            logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-        }
-
-        private CuboidResult findPendingOutput(Long cuboidId) {
-            for (CuboidResult r : outputPending) {
-                if (r.cuboidId == cuboidId)
-                    return r;
-            }
-            return null;
-        }
-
-        public Throwable getException() {
-            return outputThreadException;
-        }
-    }
-
     // ============================================================================
 
     private class InputConverter implements IGTScanner {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
new file mode 100644
index 0000000..d5563b7
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
@@ -0,0 +1,95 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderStressTest.class);
+
+    // CI sandbox memory is no more than 512MB, this many input should hit memory threshold
+    private static final int INPUT_ROWS = 200000;
+    private static final int THREADS = 4;
+
+    private static CubeInstance cube;
+    private static String flatTable;
+    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+    @BeforeClass
+    public static void before() throws IOException {
+        staticCreateTestMetadata();
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        long randSeed = System.currentTimeMillis();
+
+        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        doggedBuilder.setConcurrentThreads(THREADS);
+
+        {
+            Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
+            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+        }
+    }
+
+    class NoopWriter implements ICuboidWriter {
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
index 5c19df3..a87f950 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
@@ -82,10 +82,9 @@ public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
 
         ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
         ExecutorService executorService = Executors.newSingleThreadExecutor();
-        long randSeed = 101;
+        long randSeed = System.currentTimeMillis();
 
         DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        doggedBuilder.setOutputOrder(true);
         doggedBuilder.setConcurrentThreads(THREADS);
         doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
         FileRecordWriter doggedResult = new FileRecordWriter();
@@ -98,7 +97,6 @@ public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
         }
 
         InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        inmemBuilder.setOutputOrder(true);
         inmemBuilder.setConcurrentThreads(THREADS);
         FileRecordWriter inmemResult = new FileRecordWriter();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index 2a4cf8a..9600ef7 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -85,7 +85,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
     public void test() throws Exception {
 
         InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        cubeBuilder.setOutputOrder(true);
+        //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
         cubeBuilder.setConcurrentThreads(THREADS);
         
         ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e124e8f3/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
index 20b543a..092227b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
@@ -1,8 +1,9 @@
 package org.apache.kylin.storage.gridtable;
 
+import java.io.Closeable;
 import java.io.IOException;
 
-public class GridTable {
+public class GridTable implements Closeable {
 
     final GTInfo info;
     final IGTStore store;
@@ -50,4 +51,11 @@ public class GridTable {
     public IGTStore getStore() {
         return store;
     }
+
+    @Override
+    public void close() throws IOException {
+        if (store instanceof Closeable) {
+            ((Closeable) store).close();
+        }
+    }
 }