You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/28 09:36:24 UTC

[1/2] incubator-kylin git commit: streaming cubing

Repository: incubator-kylin
Updated Branches:
  refs/heads/streaming-cubing [created] 867425938


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
new file mode 100644
index 0000000..b530cdc
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
@@ -0,0 +1,117 @@
+package org.apache.kylin.streaming.cube;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
+
+    private static final int BENCHMARK_RECORD_LIMIT = 2000000;
+    private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
+        final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+        final CubeDesc desc = cubeSegment.getCubeDesc();
+        for (DimensionDesc dim : desc.getDimensions()) {
+            // dictionary
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (desc.getRowkey().isUseDictionary(col)) {
+                    Dictionary dict = cubeSegment.getDictionary(col);
+                    if (dict == null) {
+                        throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
+                    }
+                    logger.info("Dictionary for " + col + " was put into dictionary map.");
+                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
+                }
+            }
+        }
+        return dictionaryMap;
+    }
+
+    private static class ConsoleGTRecordWriter implements IGTRecordWriter {
+
+        boolean verbose = false;
+
+        @Override
+        public void write(Long cuboidId, GTRecord record) throws IOException {
+            if (verbose)
+                System.out.println(record.toString());
+        }
+    }
+
+    private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+        String line;
+        int counter = 0;
+        while ((line = br.readLine()) != null) {
+            queue.put(Arrays.asList(line.split("\t")));
+            counter++;
+            if (counter == BENCHMARK_RECORD_LIMIT) {
+                break;
+            }
+        }
+        queue.put(Collections.emptyList());
+    }
+
+    private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+        queue.put(Collections.emptyList());
+    }
+
+
+    @Test
+    public void test() throws Exception {
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
+        final CubeSegment cubeSegment = cube.getFirstSegment();
+
+        LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
+
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(cubeBuilder);
+        loadDataFromLocalFile(queue);
+        future.get();
+        logger.info("stream build finished");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
deleted file mode 100644
index b3d7742..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.invertedindex;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.streaming.JsonStreamParser;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-/**
- */
-public class PrintOutStreamBuilder extends StreamBuilder {
-
-    private final List<TblColRef> allColumns;
-
-    public PrintOutStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, int sliceSize, List<TblColRef> allColumns) {
-        super(streamMessageQueue, sliceSize);
-        setStreamParser(new JsonStreamParser(allColumns));
-        this.allColumns = allColumns;
-    }
-
-    @Override
-    protected void build(List<StreamMessage> streamsToBuild) throws Exception {
-        for (StreamMessage streamMessage : streamsToBuild) {
-            final List<String> row = getStreamParser().parse(streamMessage);
-            System.out.println("offset:" + streamMessage.getOffset() + " " + StringUtils.join(row, ","));
-        }
-    }
-
-    @Override
-    protected void onStop() {
-
-    }
-}


[2/2] incubator-kylin git commit: streaming cubing

Posted by qh...@apache.org.
streaming cubing


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/86742593
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/86742593
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/86742593

Branch: refs/heads/streaming-cubing
Commit: 8674259385833a330b6d9514ab2f5e44b84fadbd
Parents: d6c2576
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri May 22 17:13:25 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu May 28 15:35:48 2015 +0800

----------------------------------------------------------------------
 .../job/hadoop/cubev2/IGTRecordWriter.java      |  11 -
 .../job/hadoop/cubev2/InMemCubeBuilder.java     | 567 -------------------
 .../job/hadoop/cubev2/InMemCuboidMapper.java    |   1 +
 .../hadoop/cubev2/MapContextGTRecordWriter.java |   1 +
 .../kylin/job/streaming/CubeStreamBuilder.java  | 397 +++++++++++++
 .../kylin/job/BuildCubeWithStreamTest.java      |   4 +-
 .../apache/kylin/job/InMemCubeBuilderTest.java  |   4 +-
 .../cubev2/InMemCubeBuilderBenchmarkTest.java   | 119 ----
 .../job/streaming/CubeStreamBuilderTest.java    |  76 +++
 .../apache/kylin/streaming/StreamBuilder.java   |  13 +-
 .../kylin/streaming/cube/IGTRecordWriter.java   |  11 +
 .../kylin/streaming/cube/InMemCubeBuilder.java  | 567 +++++++++++++++++++
 .../invertedindex/IIStreamBuilder.java          |  15 +-
 .../cube/InMemCubeBuilderBenchmarkTest.java     | 117 ++++
 .../invertedindex/PrintOutStreamBuilder.java    |  70 ---
 15 files changed, 1195 insertions(+), 778 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java
deleted file mode 100644
index cccc995..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.kylin.job.hadoop.cubev2;
-
-import org.apache.kylin.storage.gridtable.GTRecord;
-
-import java.io.IOException;
-
-/**
- */
-public interface IGTRecordWriter {
-    void write(Long cuboidId, GTRecord record) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
deleted file mode 100644
index 56989a6..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- */
-public class InMemCubeBuilder implements Runnable {
-
-    private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
-    private static final int DEFAULT_TIMEOUT = 25;
-
-    private BlockingQueue<List<String>> queue;
-    private CubeDesc desc = null;
-    private long baseCuboidId;
-    private CuboidScheduler cuboidScheduler = null;
-    private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
-    private CubeJoinedFlatTableDesc intermediateTableDesc;
-    private MeasureCodec measureCodec;
-    private String[] metricsAggrFuncs = null;
-    private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
-    public static final LongWritable ONE = new LongWritable(1l);
-    private int[] hbaseMeasureRefIndex;
-    private MeasureDesc[] measureDescs;
-    private int measureCount;
-
-    protected IGTRecordWriter gtRecordWriter;
-
-
-    /**
-     * @param queue
-     * @param cube
-     * @param dictionaryMap
-     * @param gtRecordWriter
-     */
-    public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
-        if (dictionaryMap == null || dictionaryMap.isEmpty()) {
-            throw new IllegalArgumentException();
-        }
-        this.queue = queue;
-        this.desc = cube.getDescriptor();
-        this.cuboidScheduler = new CuboidScheduler(desc);
-        this.dictionaryMap = dictionaryMap;
-        this.gtRecordWriter = gtRecordWriter;
-        this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
-        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
-        this.measureCodec = new MeasureCodec(desc.getMeasures());
-
-        Map<String, Integer> measureIndexMap = Maps.newHashMap();
-        List<String> metricsAggrFuncsList = Lists.newArrayList();
-        measureCount = desc.getMeasures().size();
-
-        List<MeasureDesc> measureDescsList = Lists.newArrayList();
-        hbaseMeasureRefIndex = new int[measureCount];
-        int measureRef = 0;
-        for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
-                    for (int j = 0; j < measureCount; j++) {
-                        if (desc.getMeasures().get(j).equals(measure)) {
-                            measureDescsList.add(measure);
-                            hbaseMeasureRefIndex[measureRef] = j;
-                            break;
-                        }
-                    }
-                    measureRef++;
-                }
-            }
-        }
-
-        for (int i = 0; i < measureCount; i++) {
-            MeasureDesc measureDesc = measureDescsList.get(i);
-            metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
-            measureIndexMap.put(measureDesc.getName(), i);
-        }
-        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-
-        this.dependentMeasures = Maps.newHashMap();
-        for (int i = 0; i < measureCount; i++) {
-            String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
-            if (depMsrRef != null) {
-                int index = measureIndexMap.get(depMsrRef);
-                dependentMeasures.put(i, index);
-            }
-        }
-
-        this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
-    }
-
-
-    private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
-        GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
-        GTComboStore store = new GTComboStore(info, memStore);
-        GridTable gridTable = new GridTable(info, store);
-        return gridTable;
-    }
-
-    private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
-        Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
-        BitSet parentDimensions = columnBitSets.getFirst();
-        BitSet measureColumns = columnBitSets.getSecond();
-        BitSet childDimensions = (BitSet) parentDimensions.clone();
-
-        long mask = Long.highestOneBit(parentCuboidId);
-        long childCuboidId = cuboidId;
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId);
-        int index = 0;
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & parentCuboidId) > 0) {
-                if ((mask & childCuboidId) == 0) {
-                    // this dim will be aggregated
-                    childDimensions.set(index, false);
-                }
-                index++;
-            }
-            mask = mask >> 1;
-        }
-
-        return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
-
-    }
-
-    private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
-        IGTScanner scanner = gridTable.scan(req);
-        GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
-        GTBuilder builder = newGridTable.rebuild();
-
-        BitSet allNeededColumns = new BitSet();
-        allNeededColumns.or(aggregationColumns);
-        allNeededColumns.or(measureColumns);
-
-        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
-        int counter = 0;
-        ByteArray byteArray = new ByteArray(8);
-        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
-        try {
-            BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
-            for (Integer i : dependentMeasures.keySet()) {
-                dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
-            }
-
-            Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
-
-            for (GTRecord record : scanner) {
-                counter++;
-                for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) {
-                    newRecord.set(index, record.get(i));
-                }
-
-                if(dependentMeasures.size() > 0) {
-                    // update measures which have 'dependent_measure_ref'
-                    newRecord.getValues(dependentMetrics, hllObjects);
-
-                    for (Integer i : dependentMeasures.keySet()) {
-                        for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
-                            if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
-                                assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
-
-                                byteBuffer.clear();
-                                BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
-                                byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
-                                newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
-                            }
-                        }
-
-                    }
-                }
-
-                builder.write(newRecord);
-            }
-        } finally {
-            builder.close();
-        }
-        logger.info("Cuboid " + cuboidId + " has rows: " + counter);
-
-        return newGridTable;
-    }
-
-    private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
-        BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
-        BitSet dimension = new BitSet();
-        dimension.set(0, bitSet.cardinality());
-        BitSet metrics = new BitSet();
-        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
-        return new Pair<BitSet, BitSet>(dimension, metrics);
-    }
-
-    private Object[] buildKey(List<String> row) {
-        int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
-        Object[] key = new Object[keySize];
-
-        for (int i = 0; i < keySize; i++) {
-            key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
-        }
-
-        return key;
-    }
-
-    private Object[] buildValue(List<String> row) {
-
-        Object[] values = new Object[measureCount];
-        MeasureDesc measureDesc = null;
-
-        for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
-            int i = hbaseMeasureRefIndex[position];
-            measureDesc = measureDescs[i];
-
-            Object value = null;
-            int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
-            FunctionDesc function = desc.getMeasures().get(i).getFunction();
-            if (function.isCount() || function.isHolisticCountDistinct()) {
-                // note for holistic count distinct, this value will be ignored
-                value = ONE;
-            } else if (flatTableIdx == null) {
-                value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
-            } else if (flatTableIdx.length == 1) {
-                value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
-            } else {
-
-                byte[] result = null;
-                for (int x = 0; x < flatTableIdx.length; x++) {
-                    byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
-                    if (result == null) {
-                        result = Arrays.copyOf(split, split.length);
-                    } else {
-                        byte[] newResult = new byte[result.length + split.length];
-                        System.arraycopy(result, 0, newResult, 0, result.length);
-                        System.arraycopy(split, 0, newResult, result.length, split.length);
-                        result = newResult;
-                    }
-                }
-                value = measureCodec.getSerializer(i).valueOf(result);
-            }
-            values[position] = value;
-        }
-        return values;
-    }
-
-
-    @Override
-    public void run() {
-        try {
-            logger.info("Create base cuboid " + baseCuboidId);
-            final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
-
-            GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
-            final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
-
-            IGTScanner queueScanner = new IGTScanner() {
-
-                @Override
-                public Iterator<GTRecord> iterator() {
-                    return new Iterator<GTRecord>() {
-
-                        List<String> currentObject = null;
-
-                        @Override
-                        public boolean hasNext() {
-                            try {
-                                currentObject = queue.take();
-                            } catch (InterruptedException e) {
-                                throw new RuntimeException(e);
-                            }
-                            return currentObject != null && currentObject.size() > 0;
-                        }
-
-                        @Override
-                        public GTRecord next() {
-                            if (currentObject.size() == 0)
-                                throw new IllegalStateException();
-
-                            buildGTRecord(currentObject, baseGTRecord);
-                            return baseGTRecord;
-                        }
-
-                        @Override
-                        public void remove() {
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-                }
-
-                @Override
-                public void close() throws IOException {
-                }
-
-                @Override
-                public GTInfo getInfo() {
-                    return baseCuboidGT.getInfo();
-                }
-
-                @Override
-                public int getScannedRowCount() {
-                    return 0;
-                }
-
-                @Override
-                public int getScannedRowBlockCount() {
-                    return 0;
-                }
-            };
-
-            Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
-            GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
-            IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
-
-            int counter = 0;
-            for (GTRecord r : aggregationScanner) {
-                baseGTBuilder.write(r);
-                counter++;
-            }
-            baseGTBuilder.close();
-            aggregationScanner.close();
-
-            logger.info("Base cuboid has " + counter + " rows;");
-            SimpleGridTableTree tree = new SimpleGridTableTree();
-            tree.data = baseCuboidGT;
-            tree.id = baseCuboidId;
-            tree.parent = null;
-            if (counter > 0) {
-                List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
-                Collections.sort(children);
-                for (Long childId : children) {
-                    createNDCuboidGT(tree, baseCuboidId, childId);
-                }
-            }
-            outputGT(baseCuboidId, baseCuboidGT);
-            dropStore(baseCuboidGT);
-
-        } catch (IOException e) {
-            logger.error("Fail to build cube", e);
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    private void buildGTRecord(List<String> row, GTRecord record) {
-
-        Object[] dimensions = buildKey(row);
-        Object[] metricsValues = buildValue(row);
-        Object[] recordValues = new Object[dimensions.length + metricsValues.length];
-        System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
-        System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
-        record.setValues(recordValues);
-    }
-
-    private boolean gc(TreeNode<GridTable> parentNode) {
-        final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
-        logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
-        for (TreeNode<GridTable> gridTable : gridTables) {
-            final GTComboStore store = (GTComboStore) gridTable.data.getStore();
-            if (store.memoryUsage() > 0) {
-                logger.info("cuboid id:" + gridTable.id + " flush to disk");
-                long t = System.currentTimeMillis();
-                store.switchToDiskStore();
-                logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
-                waitForGc();
-                return true;
-            }
-        }
-        logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
-        return false;
-
-    }
-
-    private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
-        final ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
-            @Override
-            public GridTable call() throws Exception {
-                return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
-            }
-        });
-        try {
-            final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
-            return gridTable;
-        } catch (InterruptedException e) {
-            throw new RuntimeException("this should not happen", e);
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof OutOfMemoryError) {
-                logger.warn("Future.get() OutOfMemory, stop the thread");
-            } else {
-                throw new RuntimeException("this should not happen", e);
-            }
-        } catch (TimeoutException e) {
-            logger.warn("Future.get() timeout, stop the thread");
-        }
-        logger.info("shutdown executor service");
-        final List<Runnable> runnables = executorService.shutdownNow();
-        try {
-            executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
-            waitForGc();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("this should not happen", e);
-        }
-        return null;
-
-    }
-
-    private void waitForGc() {
-        System.gc();
-        logger.info("wait 5 seconds for gc");
-        try {
-            Thread.sleep(5000);
-        } catch (InterruptedException e) {
-            throw new RuntimeException("should not happen", e);
-        }
-    }
-
-    private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
-
-        long startTime = System.currentTimeMillis();
-
-//        GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
-//        if (parentStore.memoryUsage() <= 0) {
-//            long t = System.currentTimeMillis();
-//            parentStore.switchToMemStore();
-//            logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
-//        }
-
-        GridTable currentCuboid;
-        while (true) {
-            logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
-            currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
-            if (currentCuboid != null) {
-                break;
-            } else {
-                logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
-                if (gc(parentNode)) {
-                    continue;
-                } else {
-                    logger.warn("all parent node has been flushed into disk, memory is still insufficient");
-                    throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
-                }
-            }
-        }
-        SimpleGridTableTree node = new SimpleGridTableTree();
-        node.parent = parentNode;
-        node.data = currentCuboid;
-        node.id = cuboidId;
-        parentNode.children.add(node);
-
-        logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
-
-        List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
-        if (!children.isEmpty()) {
-            Collections.sort(children); // sort cuboids
-            for (Long childId : children) {
-                createNDCuboidGT(node, cuboidId, childId);
-            }
-        }
-
-
-        //output the grid table
-        outputGT(cuboidId, currentCuboid);
-        dropStore(currentCuboid);
-        parentNode.children.remove(node);
-        if (parentNode.children.size() > 0) {
-            logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
-            ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
-        }
-    }
-
-    private void dropStore(GridTable gt) throws IOException {
-        ((GTComboStore) gt.getStore()).drop();
-    }
-
-
-    private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
-        long startTime = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
-        IGTScanner scanner = gridTable.scan(req);
-        for (GTRecord record : scanner) {
-            this.gtRecordWriter.write(cuboidId, record);
-        }
-        logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-    }
-
-    private static class TreeNode<T> {
-        T data;
-        long id;
-        TreeNode<T> parent;
-        List<TreeNode<T>> children = Lists.newArrayList();
-
-        List<TreeNode<T>> getAncestorList() {
-            ArrayList<TreeNode<T>> result = Lists.newArrayList();
-            TreeNode<T> parent = this;
-            while (parent != null) {
-                result.add(parent);
-                parent = parent.parent;
-            }
-            return Lists.reverse(result);
-        }
-
-        @Override
-        public String toString() {
-            return id + "";
-        }
-    }
-
-    private static class SimpleGridTableTree extends TreeNode<GridTable> {
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index 4454e43..4efff16 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -20,6 +20,7 @@ import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
 
 import java.io.IOException;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index b8e1ffe..283bed6 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -16,6 +16,7 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
new file mode 100644
index 0000000..e0cd1ae
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -0,0 +1,397 @@
+package org.apache.kylin.job.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ReadableTable;
+import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
+import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
+import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class CubeStreamBuilder extends StreamBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilder.class);
+
+    private final CubeManager cubeManager;
+    private final String cubeName;
+    private final KylinConfig kylinConfig;
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+
+    public CubeStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, String cubeName) {
+        super(streamMessageQueue);
+        this.kylinConfig = KylinConfig.getInstanceFromEnv();
+        this.cubeManager = CubeManager.getInstance(kylinConfig);
+        this.cubeName = cubeName;
+    }
+
+    @Override
+    protected void build(List<StreamMessage> streamMessages) throws Exception {
+        if (CollectionUtils.isEmpty(streamMessages)) {
+            logger.info("nothing to build, skip to next iteration");
+            return;
+        }
+        final List<List<String>> parsedStreamMessages = parseStream(streamMessages);
+        long startOffset = streamMessages.get(0).getOffset();
+        long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset();
+        LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
+        blockingQueue.put(Collections.<String>emptyList());
+
+        final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis());
+        final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+
+        final Configuration conf = HadoopUtil.getCurrentConfiguration();
+        final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString();
+        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
+        ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
+
+        final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(cubeInstance.getDescriptor(), parsedStreamMessages);
+        writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+
+        final HTableInterface hTable = createHTable(cubeSegment);
+
+        final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
+        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance,
+                dictionaryMap, gtRecordWriter);
+
+        executorService.submit(inMemCubeBuilder).get();
+        gtRecordWriter.flush();
+        commitSegment(cubeSegment);
+    }
+
+    private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {
+        for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) {
+            final TblColRef tblColRef = entry.getKey();
+            final Dictionary<?> dictionary = entry.getValue();
+            TableSignature signature = new TableSignature();
+            signature.setLastModifiedTime(System.currentTimeMillis());
+            signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
+            signature.setSize(endOffset - startOffset);
+            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(),
+                    tblColRef.getName(),
+                    tblColRef.getColumnDesc().getZeroBasedIndex(),
+                    tblColRef.getDatatype(),
+                    signature,
+                    ReadableTable.DELIM_AUTO);
+            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
+            DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
+            try {
+                cubeSegment.putDictResPath(tblColRef, dictionaryManager.trySaveNewDict(dictionary, dictInfo).getResourcePath());
+            } catch (IOException e) {
+                logger.error("error save dictionary for column:" + tblColRef, e);
+                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
+            }
+        }
+    }
+
+    private class CubeStreamRecordWriter implements IGTRecordWriter {
+        final List<InMemKeyValueCreator> keyValueCreators;
+        final int nColumns;
+        final HTableInterface hTable;
+        private final ByteBuffer byteBuffer;
+        private final CubeDesc cubeDesc;
+        private List<Put> puts = Lists.newArrayList();
+
+        private CubeStreamRecordWriter(CubeDesc cubeDesc, HTableInterface hTable) {
+            this.keyValueCreators = Lists.newArrayList();
+            this.cubeDesc = cubeDesc;
+            int startPosition = 0;
+            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                    keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
+                    startPosition += colDesc.getMeasures().length;
+                }
+            }
+            this.nColumns = keyValueCreators.size();
+            this.hTable = hTable;
+            this.byteBuffer = ByteBuffer.allocate(1<<20);
+        }
+
+        private byte[] copy(byte[] array, int offset, int length) {
+            byte[] result = new byte[length];
+            System.arraycopy(array, offset, result, 0, length);
+            return result;
+        }
+
+        private ByteBuffer createKey(Long cuboidId, GTRecord record) {
+            byteBuffer.clear();
+            byteBuffer.put(Bytes.toBytes(cuboidId));
+            final int cardinality = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+            for (int i = 0; i < cardinality; i++) {
+                final ByteArray byteArray = record.get(i);
+                byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length());
+            }
+            return byteBuffer;
+        }
+
+        @Override
+        public void write(Long cuboidId, GTRecord record) throws IOException {
+            final ByteBuffer key = createKey(cuboidId, record);
+            final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
+            final BitSet bitSet = new BitSet();
+            bitSet.set(mapping.getDimensionCount(), mapping.getColumnCount());
+            for (int i = 0; i < nColumns; i++) {
+                final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
+                final Put put = new Put(copy(key.array(), 0, key.position()));
+                byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
+                byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
+                byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
+                put.add(family, qualifier, value);
+                puts.add(put);
+            }
+            if (puts.size() >= batchSize()) {
+                flush();
+            }
+        }
+
+        public final void flush() {
+            try {
+                if (!puts.isEmpty()) {
+                    long t = System.currentTimeMillis();
+                    if (hTable != null) {
+                        hTable.put(puts);
+                        hTable.flushCommits();
+                    }
+                    logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
+                    puts.clear();
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private Map<Integer, TblColRef> getTblColRefMap(CubeDesc cubeDesc) {
+        final List<TblColRef> columns = cubeDesc.listDimensionColumnsExcludingDerived();
+        final HashMap<Integer, TblColRef> result = Maps.newHashMap();
+        for (int i = 0; i < columns.size(); i++) {
+            result.put(i, columns.get(i));
+        }
+        return result;
+    }
+
+    private Map<TblColRef, Dictionary<?>> buildDictionary(CubeDesc cubeDesc, List<List<String>> recordList) throws IOException {
+        final Map<Integer, TblColRef> tblColRefMap = getTblColRefMap(cubeDesc);
+        HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+
+        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+        for (List<String> row : recordList) {
+            for (int i = 0; i < row.size(); i++) {
+                String cell = row.get(i);
+                if (tblColRefMap.containsKey(i)) {
+                    valueMap.put(tblColRefMap.get(i), cell);
+                }
+            }
+        }
+        for (TblColRef tblColRef : valueMap.keySet()) {
+            final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+                @Nullable
+                @Override
+                public byte[] apply(String input) {
+                    return input == null ? null : input.getBytes();
+                }
+            });
+            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+            result.put(tblColRef, dict);
+        }
+        return result;
+    }
+
+    private Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, List<List<String>> streams) {
+        CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
+        final List<Long> allCuboidIds = getAllCuboidIds(cubeDesc);
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
+
+
+        Lists.transform(allCuboidIds, new Function<Long, Integer[]>() {
+            @Nullable
+            @Override
+            public Integer[] apply(@Nullable Long cuboidId) {
+                BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+                Integer[] result = new Integer[bitSet.cardinality()];
+
+                long mask = Long.highestOneBit(baseCuboidId);
+                int position = 0;
+                for (int i = 0; i < rowkeyLength; i++) {
+                    if ((mask & cuboidId) > 0) {
+                        result[position] = i;
+                        position++;
+                    }
+                    mask = mask >> 1;
+                }
+                return result;
+            }
+        });
+        final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+        for (Long cuboidId : allCuboidIds) {
+            result.put(cuboidId, new HyperLogLogPlusCounter(14));
+            BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+            Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
+
+            long mask = Long.highestOneBit(baseCuboidId);
+            int position = 0;
+            for (int i = 0; i < rowkeyLength; i++) {
+                if ((mask & cuboidId) > 0) {
+                    cuboidBitSet[position] = i;
+                    position++;
+                }
+                mask = mask >> 1;
+            }
+            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+        }
+
+        HashFunction hf = Hashing.murmur3_32();
+        ByteArray[] row_hashcodes = new ByteArray[rowkeyLength];
+        for (int i = 0; i < rowkeyLength; i++) {
+            row_hashcodes[i] = new ByteArray();
+        }
+        for (List<String> row : streams) {
+            //generate hash for each row key column
+            for (int i = 0; i < rowkeyLength; i++) {
+                Hasher hc = hf.newHasher();
+                final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+                if (cell != null) {
+                    row_hashcodes[i].set(hc.putString(cell).hash().asBytes());
+                } else {
+                    row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+                }
+            }
+
+            for (Map.Entry<Long, HyperLogLogPlusCounter> longHyperLogLogPlusCounterEntry : result.entrySet()) {
+                Long cuboidId = longHyperLogLogPlusCounterEntry.getKey();
+                HyperLogLogPlusCounter counter = longHyperLogLogPlusCounterEntry.getValue();
+                Hasher hc = hf.newHasher();
+                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+                for (int position = 0; position < cuboidBitSet.length; position++) {
+                    hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
+                }
+                counter.add(hc.hash().asBytes());
+            }
+        }
+        return result;
+    }
+
+    private void commitSegment(CubeSegment cubeSegment) throws IOException {
+        cubeSegment.setStatus(SegmentStatusEnum.READY);
+        CubeManager.getInstance(kylinConfig).updateCube(cubeSegment.getCubeInstance(), true);
+    }
+
+    private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        List<Long> result = Lists.newArrayList();
+        CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+        getSubCuboidIds(cuboidScheduler, baseCuboidId, result);
+        return result;
+    }
+
+    private void getSubCuboidIds(CuboidScheduler cuboidScheduler, long parentCuboidId, List<Long> result) {
+        result.add(parentCuboidId);
+        for (Long cuboidId: cuboidScheduler.getSpanningCuboid(parentCuboidId)) {
+            getSubCuboidIds(cuboidScheduler, cuboidId, result);
+        }
+    }
+
+
+    private List<List<String>> parseStream(List<StreamMessage> streamMessages) {
+        return Lists.transform(streamMessages, new Function<StreamMessage, List<String>>() {
+            @Nullable
+            @Override
+            public List<String> apply(StreamMessage input) {
+                return getStreamParser().parse(input);
+            }
+        });
+    }
+
+    private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
+        final String hTableName = cubeSegment.getStorageLocationIdentifier();
+        String[] args = new String[]{"-cubename", cubeName,
+                "-segmentname", cubeSegment.getName(),
+                "-input", "/empty",
+                "-htablename", hTableName,
+                "-statisticsenabled", "true"};
+        ToolRunner.run(new CreateHTableJob(), args);
+        final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+        logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
+        return hTable;
+    }
+
+    private void loadToHTable(String hTableName) throws IOException {
+        final HTableInterface table = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+
+    }
+
+    @Override
+    protected void onStop() {
+
+    }
+
+    @Override
+    protected int batchInterval() {
+        return 30 * 60 * 1000;//30 min
+    }
+
+    @Override
+    protected int batchSize() {
+        return 10000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index a437bba..319d7fa 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -55,11 +55,11 @@ import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter;
-import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
index c4bda5f..781273d 100644
--- a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
@@ -38,10 +38,10 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.lookup.FileTableReader;
-import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter;
-import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
deleted file mode 100644
index f61aa66..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- */
-@Ignore
-public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
-
-    private static final int BENCHMARK_RECORD_LIMIT = 2000000;
-    private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
-
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
-        final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-        final CubeDesc desc = cubeSegment.getCubeDesc();
-        for (DimensionDesc dim : desc.getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (desc.getRowkey().isUseDictionary(col)) {
-                    Dictionary dict = cubeSegment.getDictionary(col);
-                    if (dict == null) {
-                        throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
-                    }
-                    logger.info("Dictionary for " + col + " was put into dictionary map.");
-                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
-                }
-            }
-        }
-        return dictionaryMap;
-    }
-
-    private static class ConsoleGTRecordWriter implements IGTRecordWriter {
-
-        boolean verbose = false;
-
-        @Override
-        public void write(Long cuboidId, GTRecord record) throws IOException {
-            if (verbose)
-                System.out.println(record.toString());
-        }
-    }
-
-    private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
-        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
-        String line;
-        int counter = 0;
-        while ((line = br.readLine()) != null) {
-            queue.put(Arrays.asList(line.split("\t")));
-            counter++;
-            if (counter == BENCHMARK_RECORD_LIMIT) {
-                break;
-            }
-        }
-        queue.put(Collections.emptyList());
-    }
-
-    private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
-        queue.put(Collections.emptyList());
-    }
-
-
-    @Test
-    public void test() throws Exception {
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-        final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
-        final CubeSegment cubeSegment = cube.getFirstSegment();
-
-        LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
-
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cubeBuilder);
-        loadDataFromLocalFile(queue);
-        future.get();
-        logger.info("stream build finished");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
new file mode 100644
index 0000000..9d62fda
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
@@ -0,0 +1,76 @@
+package org.apache.kylin.job.streaming;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.streaming.StreamMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ */
+public class CubeStreamBuilderTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilderTest.class);
+
+    private KylinConfig kylinConfig;
+
+    private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
+        cube.getSegments().clear();
+        CubeManager.getInstance(kylinConfig).updateCube(cube, true);
+
+    }
+
+    @Test
+    public void test() throws Exception {
+        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
+        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, CUBE_NAME);
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+        loadDataFromLocalFile(queue, 100000);
+        future.get();
+    }
+
+    private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
+        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+        String line;
+        int count = 0;
+        while ((line = br.readLine()) != null && count++ < maxCount) {
+            final List<String> strings = Arrays.asList(line.split("\t"));
+            queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
+        }
+        queue.put(StreamMessage.EOF);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 67cb109..a25dce3 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -48,16 +48,14 @@ public abstract class StreamBuilder implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
 
-    private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000;
-    private final int sliceSize;
+
     private StreamParser streamParser = StringStreamParser.instance;
 
     private BlockingQueue<StreamMessage> streamMessageQueue;
     private long lastBuildTime = System.currentTimeMillis();
 
-    public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, int sliceSize) {
+    public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue) {
         this.streamMessageQueue = streamMessageQueue;
-        this.sliceSize = sliceSize;
     }
 
     protected abstract void build(List<StreamMessage> streamsToBuild) throws Exception;
@@ -84,7 +82,7 @@ public abstract class StreamBuilder implements Runnable {
                 if (streamMessage == null) {
 
                     logger.info("The stream queue is drained, current available stream count: " + streamMessageToBuild.size());
-                    if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
+                    if ((System.currentTimeMillis() - lastBuildTime) > batchInterval()) {
                         build(streamMessageToBuild);
                         clearCounter();
                         streamMessageToBuild.clear();
@@ -98,7 +96,7 @@ public abstract class StreamBuilder implements Runnable {
                     }
                 }
                 streamMessageToBuild.add(streamMessage);
-                if (streamMessageToBuild.size() >= this.sliceSize) {
+                if (streamMessageToBuild.size() >= batchSize()) {
                     build(streamMessageToBuild);
                     clearCounter();
                     streamMessageToBuild.clear();
@@ -117,4 +115,7 @@ public abstract class StreamBuilder implements Runnable {
     public final void setStreamParser(StreamParser streamParser) {
         this.streamParser = streamParser;
     }
+
+    protected abstract int batchInterval();
+    protected abstract int batchSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
new file mode 100644
index 0000000..2d1e97e
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
@@ -0,0 +1,11 @@
+package org.apache.kylin.streaming.cube;
+
+import org.apache.kylin.storage.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public interface IGTRecordWriter {
+    void write(Long cuboidId, GTRecord record) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
new file mode 100644
index 0000000..e5a5b5f
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
@@ -0,0 +1,567 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+package org.apache.kylin.streaming.cube;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CubeGridTable;
+import org.apache.kylin.storage.gridtable.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ */
+public class InMemCubeBuilder implements Runnable {
+
+    private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
+    private static final int DEFAULT_TIMEOUT = 25;
+
+    private BlockingQueue<List<String>> queue;
+    private CubeDesc desc = null;
+    private long baseCuboidId;
+    private CuboidScheduler cuboidScheduler = null;
+    private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
+    private CubeJoinedFlatTableDesc intermediateTableDesc;
+    private MeasureCodec measureCodec;
+    private String[] metricsAggrFuncs = null;
+    private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
+    public static final LongWritable ONE = new LongWritable(1l);
+    private int[] hbaseMeasureRefIndex;
+    private MeasureDesc[] measureDescs;
+    private int measureCount;
+
+    protected IGTRecordWriter gtRecordWriter;
+
+
+    /**
+     * @param queue
+     * @param cube
+     * @param dictionaryMap
+     * @param gtRecordWriter
+     */
+    public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
+        if (dictionaryMap == null || dictionaryMap.isEmpty()) {
+            throw new IllegalArgumentException("dictionary cannot be empty");
+        }
+        this.queue = queue;
+        this.desc = cube.getDescriptor();
+        this.cuboidScheduler = new CuboidScheduler(desc);
+        this.dictionaryMap = dictionaryMap;
+        this.gtRecordWriter = gtRecordWriter;
+        this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
+        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+        this.measureCodec = new MeasureCodec(desc.getMeasures());
+
+        Map<String, Integer> measureIndexMap = Maps.newHashMap();
+        List<String> metricsAggrFuncsList = Lists.newArrayList();
+        measureCount = desc.getMeasures().size();
+
+        List<MeasureDesc> measureDescsList = Lists.newArrayList();
+        hbaseMeasureRefIndex = new int[measureCount];
+        int measureRef = 0;
+        for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+                    for (int j = 0; j < measureCount; j++) {
+                        if (desc.getMeasures().get(j).equals(measure)) {
+                            measureDescsList.add(measure);
+                            hbaseMeasureRefIndex[measureRef] = j;
+                            break;
+                        }
+                    }
+                    measureRef++;
+                }
+            }
+        }
+
+        for (int i = 0; i < measureCount; i++) {
+            MeasureDesc measureDesc = measureDescsList.get(i);
+            metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
+            measureIndexMap.put(measureDesc.getName(), i);
+        }
+        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+
+        this.dependentMeasures = Maps.newHashMap();
+        for (int i = 0; i < measureCount; i++) {
+            String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
+            if (depMsrRef != null) {
+                int index = measureIndexMap.get(depMsrRef);
+                dependentMeasures.put(i, index);
+            }
+        }
+
+        this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
+    }
+
+
+    private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
+        GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
+        GTComboStore store = new GTComboStore(info, memStore);
+        GridTable gridTable = new GridTable(info, store);
+        return gridTable;
+    }
+
+    private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
+        Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
+        BitSet parentDimensions = columnBitSets.getFirst();
+        BitSet measureColumns = columnBitSets.getSecond();
+        BitSet childDimensions = (BitSet) parentDimensions.clone();
+
+        long mask = Long.highestOneBit(parentCuboidId);
+        long childCuboidId = cuboidId;
+        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId);
+        int index = 0;
+        for (int i = 0; i < parentCuboidIdActualLength; i++) {
+            if ((mask & parentCuboidId) > 0) {
+                if ((mask & childCuboidId) == 0) {
+                    // this dim will be aggregated
+                    childDimensions.set(index, false);
+                }
+                index++;
+            }
+            mask = mask >> 1;
+        }
+
+        return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
+
+    }
+
+    private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
+        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+        IGTScanner scanner = gridTable.scan(req);
+        GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
+        GTBuilder builder = newGridTable.rebuild();
+
+        BitSet allNeededColumns = new BitSet();
+        allNeededColumns.or(aggregationColumns);
+        allNeededColumns.or(measureColumns);
+
+        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
+        int counter = 0;
+        ByteArray byteArray = new ByteArray(8);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
+        try {
+            BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
+            for (Integer i : dependentMeasures.keySet()) {
+                dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
+            }
+
+            Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
+
+            for (GTRecord record : scanner) {
+                counter++;
+                for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) {
+                    newRecord.set(index, record.get(i));
+                }
+
+                if(dependentMeasures.size() > 0) {
+                    // update measures which have 'dependent_measure_ref'
+                    newRecord.getValues(dependentMetrics, hllObjects);
+
+                    for (Integer i : dependentMeasures.keySet()) {
+                        for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
+                            if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
+                                assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
+
+                                byteBuffer.clear();
+                                BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
+                                byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
+                                newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
+                            }
+                        }
+
+                    }
+                }
+
+                builder.write(newRecord);
+            }
+        } finally {
+            builder.close();
+        }
+        logger.info("Cuboid " + cuboidId + " has rows: " + counter);
+
+        return newGridTable;
+    }
+
+    private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
+        BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+        BitSet dimension = new BitSet();
+        dimension.set(0, bitSet.cardinality());
+        BitSet metrics = new BitSet();
+        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
+        return new Pair<BitSet, BitSet>(dimension, metrics);
+    }
+
+    private Object[] buildKey(List<String> row) {
+        int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+        Object[] key = new Object[keySize];
+
+        for (int i = 0; i < keySize; i++) {
+            key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+        }
+
+        return key;
+    }
+
+    private Object[] buildValue(List<String> row) {
+
+        Object[] values = new Object[measureCount];
+        MeasureDesc measureDesc = null;
+
+        for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
+            int i = hbaseMeasureRefIndex[position];
+            measureDesc = measureDescs[i];
+
+            Object value = null;
+            int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
+            FunctionDesc function = desc.getMeasures().get(i).getFunction();
+            if (function.isCount() || function.isHolisticCountDistinct()) {
+                // note for holistic count distinct, this value will be ignored
+                value = ONE;
+            } else if (flatTableIdx == null) {
+                value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+            } else if (flatTableIdx.length == 1) {
+                value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
+            } else {
+
+                byte[] result = null;
+                for (int x = 0; x < flatTableIdx.length; x++) {
+                    byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
+                    if (result == null) {
+                        result = Arrays.copyOf(split, split.length);
+                    } else {
+                        byte[] newResult = new byte[result.length + split.length];
+                        System.arraycopy(result, 0, newResult, 0, result.length);
+                        System.arraycopy(split, 0, newResult, result.length, split.length);
+                        result = newResult;
+                    }
+                }
+                value = measureCodec.getSerializer(i).valueOf(result);
+            }
+            values[position] = value;
+        }
+        return values;
+    }
+
+
+    @Override
+    public void run() {
+        try {
+            logger.info("Create base cuboid " + baseCuboidId);
+            final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
+
+            GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
+            final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
+
+            IGTScanner queueScanner = new IGTScanner() {
+
+                @Override
+                public Iterator<GTRecord> iterator() {
+                    return new Iterator<GTRecord>() {
+
+                        List<String> currentObject = null;
+
+                        @Override
+                        public boolean hasNext() {
+                            try {
+                                currentObject = queue.take();
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                            return currentObject != null && currentObject.size() > 0;
+                        }
+
+                        @Override
+                        public GTRecord next() {
+                            if (currentObject.size() == 0)
+                                throw new IllegalStateException();
+
+                            buildGTRecord(currentObject, baseGTRecord);
+                            return baseGTRecord;
+                        }
+
+                        @Override
+                        public void remove() {
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                }
+
+                @Override
+                public void close() throws IOException {
+                }
+
+                @Override
+                public GTInfo getInfo() {
+                    return baseCuboidGT.getInfo();
+                }
+
+                @Override
+                public int getScannedRowCount() {
+                    return 0;
+                }
+
+                @Override
+                public int getScannedRowBlockCount() {
+                    return 0;
+                }
+            };
+
+            Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
+            GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
+            IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
+
+            int counter = 0;
+            for (GTRecord r : aggregationScanner) {
+                baseGTBuilder.write(r);
+                counter++;
+            }
+            baseGTBuilder.close();
+            aggregationScanner.close();
+
+            logger.info("Base cuboid has " + counter + " rows;");
+            SimpleGridTableTree tree = new SimpleGridTableTree();
+            tree.data = baseCuboidGT;
+            tree.id = baseCuboidId;
+            tree.parent = null;
+            if (counter > 0) {
+                List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
+                Collections.sort(children);
+                for (Long childId : children) {
+                    createNDCuboidGT(tree, baseCuboidId, childId);
+                }
+            }
+            outputGT(baseCuboidId, baseCuboidGT);
+            dropStore(baseCuboidGT);
+
+        } catch (IOException e) {
+            logger.error("Fail to build cube", e);
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    private void buildGTRecord(List<String> row, GTRecord record) {
+
+        Object[] dimensions = buildKey(row);
+        Object[] metricsValues = buildValue(row);
+        Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+        System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+        System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+        record.setValues(recordValues);
+    }
+
+    private boolean gc(TreeNode<GridTable> parentNode) {
+        final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
+        logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
+        for (TreeNode<GridTable> gridTable : gridTables) {
+            final GTComboStore store = (GTComboStore) gridTable.data.getStore();
+            if (store.memoryUsage() > 0) {
+                logger.info("cuboid id:" + gridTable.id + " flush to disk");
+                long t = System.currentTimeMillis();
+                store.switchToDiskStore();
+                logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
+                waitForGc();
+                return true;
+            }
+        }
+        logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
+        return false;
+
+    }
+
+    private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+        final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
+            @Override
+            public GridTable call() throws Exception {
+                return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
+            }
+        });
+        try {
+            final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+            return gridTable;
+        } catch (InterruptedException e) {
+            throw new RuntimeException("this should not happen", e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof OutOfMemoryError) {
+                logger.warn("Future.get() OutOfMemory, stop the thread");
+            } else {
+                throw new RuntimeException("this should not happen", e);
+            }
+        } catch (TimeoutException e) {
+            logger.warn("Future.get() timeout, stop the thread");
+        }
+        logger.info("shutdown executor service");
+        final List<Runnable> runnables = executorService.shutdownNow();
+        try {
+            executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+            waitForGc();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("this should not happen", e);
+        }
+        return null;
+
+    }
+
+    private void waitForGc() {
+        System.gc();
+        logger.info("wait 5 seconds for gc");
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("should not happen", e);
+        }
+    }
+
+    private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
+
+        long startTime = System.currentTimeMillis();
+
+//        GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
+//        if (parentStore.memoryUsage() <= 0) {
+//            long t = System.currentTimeMillis();
+//            parentStore.switchToMemStore();
+//            logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
+//        }
+
+        GridTable currentCuboid;
+        while (true) {
+            logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
+            currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
+            if (currentCuboid != null) {
+                break;
+            } else {
+                logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
+                if (gc(parentNode)) {
+                    continue;
+                } else {
+                    logger.warn("all parent node has been flushed into disk, memory is still insufficient");
+                    throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
+                }
+            }
+        }
+        SimpleGridTableTree node = new SimpleGridTableTree();
+        node.parent = parentNode;
+        node.data = currentCuboid;
+        node.id = cuboidId;
+        parentNode.children.add(node);
+
+        logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
+
+        List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+        if (!children.isEmpty()) {
+            Collections.sort(children); // sort cuboids
+            for (Long childId : children) {
+                createNDCuboidGT(node, cuboidId, childId);
+            }
+        }
+
+
+        //output the grid table
+        outputGT(cuboidId, currentCuboid);
+        dropStore(currentCuboid);
+        parentNode.children.remove(node);
+        if (parentNode.children.size() > 0) {
+            logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
+            ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
+        }
+    }
+
+    private void dropStore(GridTable gt) throws IOException {
+        ((GTComboStore) gt.getStore()).drop();
+    }
+
+
+    private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
+        long startTime = System.currentTimeMillis();
+        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+        IGTScanner scanner = gridTable.scan(req);
+        for (GTRecord record : scanner) {
+            this.gtRecordWriter.write(cuboidId, record);
+        }
+        logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+    }
+
+    private static class TreeNode<T> {
+        T data;
+        long id;
+        TreeNode<T> parent;
+        List<TreeNode<T>> children = Lists.newArrayList();
+
+        List<TreeNode<T>> getAncestorList() {
+            ArrayList<TreeNode<T>> result = Lists.newArrayList();
+            TreeNode<T> parent = this;
+            while (parent != null) {
+                result.add(parent);
+                parent = parent.parent;
+            }
+            return Lists.reverse(result);
+        }
+
+        @Override
+        public String toString() {
+            return id + "";
+        }
+    }
+
+    private static class SimpleGridTableTree extends TreeNode<GridTable> {
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/86742593/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 755b490..56435bd 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -61,16 +61,19 @@ import java.util.concurrent.TimeUnit;
 public class IIStreamBuilder extends StreamBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class);
+    private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000;
 
     private final IIDesc desc;
     private final HTableInterface hTable;
     private final SliceBuilder sliceBuilder;
     private final int shardId;
     private final String streaming;
+    private final int batchSize;
     private StreamingManager streamingManager;
 
     public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard) {
-        super(queue, iiDesc.getSliceSize());
+        super(queue);
+        this.batchSize = iiDesc.getSliceSize();
         this.streaming = streaming;
         this.desc = iiDesc;
         this.shardId = shard;
@@ -117,6 +120,16 @@ public class IIStreamBuilder extends StreamBuilder {
         }
     }
 
+    @Override
+    protected int batchInterval() {
+        return BATCH_BUILD_INTERVAL_THRESHOLD;
+    }
+
+    @Override
+    protected int batchSize() {
+        return batchSize;
+    }
+
     private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
         List<Put> data = Lists.newArrayList();
         for (IIRow row : codec.encodeKeyValue(slice)) {