You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/29 09:44:52 UTC

[4/4] incubator-kylin git commit: streaming cubing

streaming cubing


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a6a9d940
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a6a9d940
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a6a9d940

Branch: refs/heads/0.8.0
Commit: a6a9d940ca220f9c94d4c4ff6cc3a2cccb617c0b
Parents: cf25daa
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri May 22 17:13:25 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri May 29 15:44:25 2015 +0800

----------------------------------------------------------------------
 .../common/persistence/HBaseResourceStore.java  |  35 +-
 .../org/apache/kylin/common/util/TimeUtil.java  |  13 +-
 .../apache/kylin/common/util/TimeUtilTest.java  |  25 +-
 .../job/hadoop/cubev2/IGTRecordWriter.java      |  11 -
 .../job/hadoop/cubev2/InMemCubeBuilder.java     | 567 -------------------
 .../job/hadoop/cubev2/InMemCuboidMapper.java    |   1 +
 .../hadoop/cubev2/MapContextGTRecordWriter.java |   1 +
 .../kylin/job/streaming/CubeStreamBuilder.java  | 403 +++++++++++++
 .../kylin/job/streaming/StreamingBootstrap.java |  96 +++-
 .../kylin/job/BuildCubeWithStreamTest.java      |   4 +-
 .../job/ITKafkaBasedIIStreamBuilderTest.java    |  32 +-
 .../apache/kylin/job/InMemCubeBuilderTest.java  |   4 +-
 .../cubev2/InMemCubeBuilderBenchmarkTest.java   | 119 ----
 .../job/streaming/CubeStreamBuilderTest.java    |  76 +++
 .../kylin/storage/StorageEngineFactory.java     |   3 +-
 .../org/apache/kylin/streaming/KafkaConfig.java |  12 +-
 .../kylin/streaming/SEOJsonStreamParser.java    | 100 ++++
 .../apache/kylin/streaming/StreamBuilder.java   |  20 +-
 .../kylin/streaming/cube/IGTRecordWriter.java   |  11 +
 .../kylin/streaming/cube/InMemCubeBuilder.java  | 567 +++++++++++++++++++
 .../invertedindex/IIStreamBuilder.java          |  15 +-
 .../cube/InMemCubeBuilderBenchmarkTest.java     | 117 ++++
 .../invertedindex/PrintOutStreamBuilder.java    |  70 ---
 23 files changed, 1437 insertions(+), 865 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index 2868368..e665298 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -18,34 +18,27 @@
 
 package org.apache.kylin.common.persistence;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.kylin.common.util.Bytes;
-
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.HadoopUtil;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 public class HBaseResourceStore extends ResourceStore {
 
     private static final String DEFAULT_TABLE_NAME = "kylin_metadata";
@@ -197,8 +190,10 @@ public class HBaseResourceStore extends ResourceStore {
             Put put = buildPut(resPath, newTS, row, content, table);
 
             boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
-            if (!ok)
-                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + getResourceTimestamp(resPath));
+            if (!ok) {
+                long real = getResourceTimestamp(resPath);
+                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
+            }
 
             table.flushCommits();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
index 26e3e06..0aa58e4 100644
--- a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -3,8 +3,13 @@ package org.apache.kylin.common.util;
 /**
  */
 public class TimeUtil {
-    private static int ONE_MINUTE_TS = 60 * 1000;
-    private static int ONE_HOUR_TS = 60 * 60 * 1000;
+    public enum NormalizedTimeUnit {
+        MINUTE, HOUR, DAY
+    }
+
+    private static long ONE_MINUTE_TS = 60 * 1000;
+    private static long ONE_HOUR_TS = 60 * ONE_MINUTE_TS;
+    private static long ONE_DAY_TS = 24 * ONE_HOUR_TS;
 
     public static long getMinuteStart(long ts) {
         return ts / ONE_MINUTE_TS * ONE_MINUTE_TS;
@@ -13,4 +18,8 @@ public class TimeUtil {
     public static long getHourStart(long ts) {
         return ts / ONE_HOUR_TS * ONE_HOUR_TS;
     }
+
+    public static long getDayStart(long ts) {
+        return ts / ONE_DAY_TS * ONE_DAY_TS;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
index 90a0c40..cfa11d8 100644
--- a/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
@@ -1,24 +1,25 @@
 package org.apache.kylin.common.util;
 
+import org.apache.kylin.common.util.TimeUtil.NormalizedTimeUnit;
+import org.junit.Assert;
+import org.junit.Test;
+
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 
-import org.junit.Assert;
-import org.junit.Test;
-
 /**
  */
 public class TimeUtilTest {
-    public static long normalizeTime(long timeMillis, NormalizeUnit unit) {
+    public static long normalizeTime(long timeMillis, NormalizedTimeUnit unit) {
         Calendar a = Calendar.getInstance();
         Calendar b = Calendar.getInstance();
         b.clear();
 
         a.setTimeInMillis(timeMillis);
-        if (unit == NormalizeUnit.MINUTE) {
+        if (unit == NormalizedTimeUnit.MINUTE) {
             b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), a.get(Calendar.MINUTE));
-        } else if (unit == NormalizeUnit.HOUR) {
+        } else if (unit == NormalizedTimeUnit.HOUR) {
             b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), 0);
         }
         return b.getTimeInMillis();
@@ -29,15 +30,13 @@ public class TimeUtilTest {
         java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
 
         long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime();
-        Assert.assertEquals(normalizeTime(t1, NormalizeUnit.HOUR), TimeUtil.getHourStart(t1));
-        Assert.assertEquals(normalizeTime(t1, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t1));
+        Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t1));
+        Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t1));
 
         long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime();
-        Assert.assertEquals(normalizeTime(t2, NormalizeUnit.HOUR), TimeUtil.getHourStart(t2));
-        Assert.assertEquals(normalizeTime(t2, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
+        Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t2));
+        Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
     }
 
-    public enum NormalizeUnit {
-        MINUTE, HOUR
-    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java
deleted file mode 100644
index cccc995..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.kylin.job.hadoop.cubev2;
-
-import org.apache.kylin.storage.gridtable.GTRecord;
-
-import java.io.IOException;
-
-/**
- */
-public interface IGTRecordWriter {
-    void write(Long cuboidId, GTRecord record) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
deleted file mode 100644
index 56989a6..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- */
-public class InMemCubeBuilder implements Runnable {
-
-    private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
-    private static final int DEFAULT_TIMEOUT = 25;
-
-    private BlockingQueue<List<String>> queue;
-    private CubeDesc desc = null;
-    private long baseCuboidId;
-    private CuboidScheduler cuboidScheduler = null;
-    private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
-    private CubeJoinedFlatTableDesc intermediateTableDesc;
-    private MeasureCodec measureCodec;
-    private String[] metricsAggrFuncs = null;
-    private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
-    public static final LongWritable ONE = new LongWritable(1l);
-    private int[] hbaseMeasureRefIndex;
-    private MeasureDesc[] measureDescs;
-    private int measureCount;
-
-    protected IGTRecordWriter gtRecordWriter;
-
-
-    /**
-     * @param queue
-     * @param cube
-     * @param dictionaryMap
-     * @param gtRecordWriter
-     */
-    public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
-        if (dictionaryMap == null || dictionaryMap.isEmpty()) {
-            throw new IllegalArgumentException();
-        }
-        this.queue = queue;
-        this.desc = cube.getDescriptor();
-        this.cuboidScheduler = new CuboidScheduler(desc);
-        this.dictionaryMap = dictionaryMap;
-        this.gtRecordWriter = gtRecordWriter;
-        this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
-        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
-        this.measureCodec = new MeasureCodec(desc.getMeasures());
-
-        Map<String, Integer> measureIndexMap = Maps.newHashMap();
-        List<String> metricsAggrFuncsList = Lists.newArrayList();
-        measureCount = desc.getMeasures().size();
-
-        List<MeasureDesc> measureDescsList = Lists.newArrayList();
-        hbaseMeasureRefIndex = new int[measureCount];
-        int measureRef = 0;
-        for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
-                    for (int j = 0; j < measureCount; j++) {
-                        if (desc.getMeasures().get(j).equals(measure)) {
-                            measureDescsList.add(measure);
-                            hbaseMeasureRefIndex[measureRef] = j;
-                            break;
-                        }
-                    }
-                    measureRef++;
-                }
-            }
-        }
-
-        for (int i = 0; i < measureCount; i++) {
-            MeasureDesc measureDesc = measureDescsList.get(i);
-            metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
-            measureIndexMap.put(measureDesc.getName(), i);
-        }
-        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-
-        this.dependentMeasures = Maps.newHashMap();
-        for (int i = 0; i < measureCount; i++) {
-            String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
-            if (depMsrRef != null) {
-                int index = measureIndexMap.get(depMsrRef);
-                dependentMeasures.put(i, index);
-            }
-        }
-
-        this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
-    }
-
-
-    private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
-        GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
-        GTComboStore store = new GTComboStore(info, memStore);
-        GridTable gridTable = new GridTable(info, store);
-        return gridTable;
-    }
-
-    private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
-        Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
-        BitSet parentDimensions = columnBitSets.getFirst();
-        BitSet measureColumns = columnBitSets.getSecond();
-        BitSet childDimensions = (BitSet) parentDimensions.clone();
-
-        long mask = Long.highestOneBit(parentCuboidId);
-        long childCuboidId = cuboidId;
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId);
-        int index = 0;
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & parentCuboidId) > 0) {
-                if ((mask & childCuboidId) == 0) {
-                    // this dim will be aggregated
-                    childDimensions.set(index, false);
-                }
-                index++;
-            }
-            mask = mask >> 1;
-        }
-
-        return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
-
-    }
-
-    private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
-        IGTScanner scanner = gridTable.scan(req);
-        GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
-        GTBuilder builder = newGridTable.rebuild();
-
-        BitSet allNeededColumns = new BitSet();
-        allNeededColumns.or(aggregationColumns);
-        allNeededColumns.or(measureColumns);
-
-        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
-        int counter = 0;
-        ByteArray byteArray = new ByteArray(8);
-        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
-        try {
-            BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
-            for (Integer i : dependentMeasures.keySet()) {
-                dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
-            }
-
-            Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
-
-            for (GTRecord record : scanner) {
-                counter++;
-                for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) {
-                    newRecord.set(index, record.get(i));
-                }
-
-                if(dependentMeasures.size() > 0) {
-                    // update measures which have 'dependent_measure_ref'
-                    newRecord.getValues(dependentMetrics, hllObjects);
-
-                    for (Integer i : dependentMeasures.keySet()) {
-                        for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
-                            if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
-                                assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
-
-                                byteBuffer.clear();
-                                BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
-                                byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
-                                newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
-                            }
-                        }
-
-                    }
-                }
-
-                builder.write(newRecord);
-            }
-        } finally {
-            builder.close();
-        }
-        logger.info("Cuboid " + cuboidId + " has rows: " + counter);
-
-        return newGridTable;
-    }
-
-    private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
-        BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
-        BitSet dimension = new BitSet();
-        dimension.set(0, bitSet.cardinality());
-        BitSet metrics = new BitSet();
-        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
-        return new Pair<BitSet, BitSet>(dimension, metrics);
-    }
-
-    private Object[] buildKey(List<String> row) {
-        int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
-        Object[] key = new Object[keySize];
-
-        for (int i = 0; i < keySize; i++) {
-            key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
-        }
-
-        return key;
-    }
-
-    private Object[] buildValue(List<String> row) {
-
-        Object[] values = new Object[measureCount];
-        MeasureDesc measureDesc = null;
-
-        for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
-            int i = hbaseMeasureRefIndex[position];
-            measureDesc = measureDescs[i];
-
-            Object value = null;
-            int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
-            FunctionDesc function = desc.getMeasures().get(i).getFunction();
-            if (function.isCount() || function.isHolisticCountDistinct()) {
-                // note for holistic count distinct, this value will be ignored
-                value = ONE;
-            } else if (flatTableIdx == null) {
-                value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
-            } else if (flatTableIdx.length == 1) {
-                value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
-            } else {
-
-                byte[] result = null;
-                for (int x = 0; x < flatTableIdx.length; x++) {
-                    byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
-                    if (result == null) {
-                        result = Arrays.copyOf(split, split.length);
-                    } else {
-                        byte[] newResult = new byte[result.length + split.length];
-                        System.arraycopy(result, 0, newResult, 0, result.length);
-                        System.arraycopy(split, 0, newResult, result.length, split.length);
-                        result = newResult;
-                    }
-                }
-                value = measureCodec.getSerializer(i).valueOf(result);
-            }
-            values[position] = value;
-        }
-        return values;
-    }
-
-
-    @Override
-    public void run() {
-        try {
-            logger.info("Create base cuboid " + baseCuboidId);
-            final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
-
-            GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
-            final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
-
-            IGTScanner queueScanner = new IGTScanner() {
-
-                @Override
-                public Iterator<GTRecord> iterator() {
-                    return new Iterator<GTRecord>() {
-
-                        List<String> currentObject = null;
-
-                        @Override
-                        public boolean hasNext() {
-                            try {
-                                currentObject = queue.take();
-                            } catch (InterruptedException e) {
-                                throw new RuntimeException(e);
-                            }
-                            return currentObject != null && currentObject.size() > 0;
-                        }
-
-                        @Override
-                        public GTRecord next() {
-                            if (currentObject.size() == 0)
-                                throw new IllegalStateException();
-
-                            buildGTRecord(currentObject, baseGTRecord);
-                            return baseGTRecord;
-                        }
-
-                        @Override
-                        public void remove() {
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-                }
-
-                @Override
-                public void close() throws IOException {
-                }
-
-                @Override
-                public GTInfo getInfo() {
-                    return baseCuboidGT.getInfo();
-                }
-
-                @Override
-                public int getScannedRowCount() {
-                    return 0;
-                }
-
-                @Override
-                public int getScannedRowBlockCount() {
-                    return 0;
-                }
-            };
-
-            Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
-            GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
-            IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
-
-            int counter = 0;
-            for (GTRecord r : aggregationScanner) {
-                baseGTBuilder.write(r);
-                counter++;
-            }
-            baseGTBuilder.close();
-            aggregationScanner.close();
-
-            logger.info("Base cuboid has " + counter + " rows;");
-            SimpleGridTableTree tree = new SimpleGridTableTree();
-            tree.data = baseCuboidGT;
-            tree.id = baseCuboidId;
-            tree.parent = null;
-            if (counter > 0) {
-                List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
-                Collections.sort(children);
-                for (Long childId : children) {
-                    createNDCuboidGT(tree, baseCuboidId, childId);
-                }
-            }
-            outputGT(baseCuboidId, baseCuboidGT);
-            dropStore(baseCuboidGT);
-
-        } catch (IOException e) {
-            logger.error("Fail to build cube", e);
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    private void buildGTRecord(List<String> row, GTRecord record) {
-
-        Object[] dimensions = buildKey(row);
-        Object[] metricsValues = buildValue(row);
-        Object[] recordValues = new Object[dimensions.length + metricsValues.length];
-        System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
-        System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
-        record.setValues(recordValues);
-    }
-
-    private boolean gc(TreeNode<GridTable> parentNode) {
-        final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
-        logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
-        for (TreeNode<GridTable> gridTable : gridTables) {
-            final GTComboStore store = (GTComboStore) gridTable.data.getStore();
-            if (store.memoryUsage() > 0) {
-                logger.info("cuboid id:" + gridTable.id + " flush to disk");
-                long t = System.currentTimeMillis();
-                store.switchToDiskStore();
-                logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
-                waitForGc();
-                return true;
-            }
-        }
-        logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
-        return false;
-
-    }
-
-    private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
-        final ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
-            @Override
-            public GridTable call() throws Exception {
-                return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
-            }
-        });
-        try {
-            final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
-            return gridTable;
-        } catch (InterruptedException e) {
-            throw new RuntimeException("this should not happen", e);
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof OutOfMemoryError) {
-                logger.warn("Future.get() OutOfMemory, stop the thread");
-            } else {
-                throw new RuntimeException("this should not happen", e);
-            }
-        } catch (TimeoutException e) {
-            logger.warn("Future.get() timeout, stop the thread");
-        }
-        logger.info("shutdown executor service");
-        final List<Runnable> runnables = executorService.shutdownNow();
-        try {
-            executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
-            waitForGc();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("this should not happen", e);
-        }
-        return null;
-
-    }
-
-    private void waitForGc() {
-        System.gc();
-        logger.info("wait 5 seconds for gc");
-        try {
-            Thread.sleep(5000);
-        } catch (InterruptedException e) {
-            throw new RuntimeException("should not happen", e);
-        }
-    }
-
-    private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
-
-        long startTime = System.currentTimeMillis();
-
-//        GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
-//        if (parentStore.memoryUsage() <= 0) {
-//            long t = System.currentTimeMillis();
-//            parentStore.switchToMemStore();
-//            logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
-//        }
-
-        GridTable currentCuboid;
-        while (true) {
-            logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
-            currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
-            if (currentCuboid != null) {
-                break;
-            } else {
-                logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
-                if (gc(parentNode)) {
-                    continue;
-                } else {
-                    logger.warn("all parent node has been flushed into disk, memory is still insufficient");
-                    throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
-                }
-            }
-        }
-        SimpleGridTableTree node = new SimpleGridTableTree();
-        node.parent = parentNode;
-        node.data = currentCuboid;
-        node.id = cuboidId;
-        parentNode.children.add(node);
-
-        logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
-
-        List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
-        if (!children.isEmpty()) {
-            Collections.sort(children); // sort cuboids
-            for (Long childId : children) {
-                createNDCuboidGT(node, cuboidId, childId);
-            }
-        }
-
-
-        //output the grid table
-        outputGT(cuboidId, currentCuboid);
-        dropStore(currentCuboid);
-        parentNode.children.remove(node);
-        if (parentNode.children.size() > 0) {
-            logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
-            ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
-        }
-    }
-
-    private void dropStore(GridTable gt) throws IOException {
-        ((GTComboStore) gt.getStore()).drop();
-    }
-
-
-    private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
-        long startTime = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
-        IGTScanner scanner = gridTable.scan(req);
-        for (GTRecord record : scanner) {
-            this.gtRecordWriter.write(cuboidId, record);
-        }
-        logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-    }
-
-    private static class TreeNode<T> {
-        T data;
-        long id;
-        TreeNode<T> parent;
-        List<TreeNode<T>> children = Lists.newArrayList();
-
-        List<TreeNode<T>> getAncestorList() {
-            ArrayList<TreeNode<T>> result = Lists.newArrayList();
-            TreeNode<T> parent = this;
-            while (parent != null) {
-                result.add(parent);
-                parent = parent.parent;
-            }
-            return Lists.reverse(result);
-        }
-
-        @Override
-        public String toString() {
-            return id + "";
-        }
-    }
-
-    private static class SimpleGridTableTree extends TreeNode<GridTable> {
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index 4454e43..4efff16 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -20,6 +20,7 @@ import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
 
 import java.io.IOException;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index b8e1ffe..283bed6 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -16,6 +16,7 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
new file mode 100644
index 0000000..82892dc
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -0,0 +1,403 @@
+package org.apache.kylin.job.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ReadableTable;
+import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
+import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
+import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.SEOJsonStreamParser;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class CubeStreamBuilder extends StreamBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilder.class);
+
+    private final CubeManager cubeManager;
+    private final String cubeName;
+    private final KylinConfig kylinConfig;
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+
+    public CubeStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, String cubeName) {
+        super(streamMessageQueue);
+        this.kylinConfig = KylinConfig.getInstanceFromEnv();
+        this.cubeManager = CubeManager.getInstance(kylinConfig);
+        this.cubeName = cubeName;
+        setStreamParser(new SEOJsonStreamParser(cubeManager.getCube(cubeName).getAllColumns()));
+    }
+
+    @Override
+    protected void build(List<StreamMessage> streamMessages) throws Exception {
+        if (CollectionUtils.isEmpty(streamMessages)) {
+            logger.info("nothing to build, skip to next iteration");
+            return;
+        }
+        final List<List<String>> parsedStreamMessages = parseStream(streamMessages);
+        long startOffset = streamMessages.get(0).getOffset();
+        long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset();
+        LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
+        blockingQueue.put(Collections.<String>emptyList());
+
+        final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis());
+        final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+
+        final Configuration conf = HadoopUtil.getCurrentConfiguration();
+        final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString();
+        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
+        ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
+
+        final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
+        writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+
+        final HTableInterface hTable = createHTable(cubeSegment);
+
+        final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
+        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance,
+                dictionaryMap, gtRecordWriter);
+
+        executorService.submit(inMemCubeBuilder).get();
+        gtRecordWriter.flush();
+        commitSegment(cubeSegment);
+    }
+
+    private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {
+        for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) {
+            final TblColRef tblColRef = entry.getKey();
+            final Dictionary<?> dictionary = entry.getValue();
+            TableSignature signature = new TableSignature();
+            signature.setLastModifiedTime(System.currentTimeMillis());
+            signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
+            signature.setSize(endOffset - startOffset);
+            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(),
+                    tblColRef.getName(),
+                    tblColRef.getColumnDesc().getZeroBasedIndex(),
+                    tblColRef.getDatatype(),
+                    signature,
+                    ReadableTable.DELIM_AUTO);
+            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
+            DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
+            try {
+                cubeSegment.putDictResPath(tblColRef, dictionaryManager.trySaveNewDict(dictionary, dictInfo).getResourcePath());
+            } catch (IOException e) {
+                logger.error("error save dictionary for column:" + tblColRef, e);
+                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
+            }
+        }
+    }
+
+    private class CubeStreamRecordWriter implements IGTRecordWriter {
+        final List<InMemKeyValueCreator> keyValueCreators;
+        final int nColumns;
+        final HTableInterface hTable;
+        private final ByteBuffer byteBuffer;
+        private final CubeDesc cubeDesc;
+        private List<Put> puts = Lists.newArrayList();
+
+        private CubeStreamRecordWriter(CubeDesc cubeDesc, HTableInterface hTable) {
+            this.keyValueCreators = Lists.newArrayList();
+            this.cubeDesc = cubeDesc;
+            int startPosition = 0;
+            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                    keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
+                    startPosition += colDesc.getMeasures().length;
+                }
+            }
+            this.nColumns = keyValueCreators.size();
+            this.hTable = hTable;
+            this.byteBuffer = ByteBuffer.allocate(1<<20);
+        }
+
+        private byte[] copy(byte[] array, int offset, int length) {
+            byte[] result = new byte[length];
+            System.arraycopy(array, offset, result, 0, length);
+            return result;
+        }
+
+        private ByteBuffer createKey(Long cuboidId, GTRecord record) {
+            byteBuffer.clear();
+            byteBuffer.put(Bytes.toBytes(cuboidId));
+            final int cardinality = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+            for (int i = 0; i < cardinality; i++) {
+                final ByteArray byteArray = record.get(i);
+                byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length());
+            }
+            return byteBuffer;
+        }
+
+        @Override
+        public void write(Long cuboidId, GTRecord record) throws IOException {
+            final ByteBuffer key = createKey(cuboidId, record);
+            final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
+            final BitSet bitSet = new BitSet();
+            bitSet.set(mapping.getDimensionCount(), mapping.getColumnCount());
+            for (int i = 0; i < nColumns; i++) {
+                final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
+                final Put put = new Put(copy(key.array(), 0, key.position()));
+                byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
+                byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
+                byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
+                put.add(family, qualifier, value);
+                puts.add(put);
+            }
+            if (puts.size() >= batchSize()) {
+                flush();
+            }
+        }
+
+        public final void flush() {
+            try {
+                if (!puts.isEmpty()) {
+                    long t = System.currentTimeMillis();
+                    if (hTable != null) {
+                        hTable.put(puts);
+                        hTable.flushCommits();
+                    }
+                    logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
+                    puts.clear();
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private Map<Integer, TblColRef> getTblColRefMap(CubeInstance cubeInstance) {
+        final List<TblColRef> columns = cubeInstance.getAllColumns();
+        final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
+        final HashMap<Integer, TblColRef> result = Maps.newHashMap();
+        for (int i = 0; i < columns.size(); i++) {
+            final TblColRef tblColRef = columns.get(i);
+            if (allDimensions.contains(tblColRef)) {
+                result.put(i, tblColRef);
+            }
+        }
+        return result;
+    }
+
+    private Map<TblColRef, Dictionary<?>> buildDictionary(final Map<Integer, TblColRef> tblColRefMap, List<List<String>> recordList) throws IOException {
+        HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+
+        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+        for (List<String> row : recordList) {
+            for (int i = 0; i < row.size(); i++) {
+                String cell = row.get(i);
+                if (tblColRefMap.containsKey(i)) {
+                    valueMap.put(tblColRefMap.get(i), cell);
+                }
+            }
+        }
+        for (TblColRef tblColRef : valueMap.keySet()) {
+            final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+                @Nullable
+                @Override
+                public byte[] apply(String input) {
+                    return input == null ? null : input.getBytes();
+                }
+            });
+            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+            result.put(tblColRef, dict);
+        }
+        return result;
+    }
+
+    private Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, List<List<String>> streams) {
+        CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
+        final List<Long> allCuboidIds = getAllCuboidIds(cubeDesc);
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
+
+
+        Lists.transform(allCuboidIds, new Function<Long, Integer[]>() {
+            @Nullable
+            @Override
+            public Integer[] apply(@Nullable Long cuboidId) {
+                BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+                Integer[] result = new Integer[bitSet.cardinality()];
+
+                long mask = Long.highestOneBit(baseCuboidId);
+                int position = 0;
+                for (int i = 0; i < rowkeyLength; i++) {
+                    if ((mask & cuboidId) > 0) {
+                        result[position] = i;
+                        position++;
+                    }
+                    mask = mask >> 1;
+                }
+                return result;
+            }
+        });
+        final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+        for (Long cuboidId : allCuboidIds) {
+            result.put(cuboidId, new HyperLogLogPlusCounter(14));
+            BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+            Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
+
+            long mask = Long.highestOneBit(baseCuboidId);
+            int position = 0;
+            for (int i = 0; i < rowkeyLength; i++) {
+                if ((mask & cuboidId) > 0) {
+                    cuboidBitSet[position] = i;
+                    position++;
+                }
+                mask = mask >> 1;
+            }
+            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+        }
+
+        HashFunction hf = Hashing.murmur3_32();
+        ByteArray[] row_hashcodes = new ByteArray[rowkeyLength];
+        for (int i = 0; i < rowkeyLength; i++) {
+            row_hashcodes[i] = new ByteArray();
+        }
+        for (List<String> row : streams) {
+            //generate hash for each row key column
+            for (int i = 0; i < rowkeyLength; i++) {
+                Hasher hc = hf.newHasher();
+                final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+                if (cell != null) {
+                    row_hashcodes[i].set(hc.putString(cell).hash().asBytes());
+                } else {
+                    row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+                }
+            }
+
+            for (Map.Entry<Long, HyperLogLogPlusCounter> longHyperLogLogPlusCounterEntry : result.entrySet()) {
+                Long cuboidId = longHyperLogLogPlusCounterEntry.getKey();
+                HyperLogLogPlusCounter counter = longHyperLogLogPlusCounterEntry.getValue();
+                Hasher hc = hf.newHasher();
+                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+                for (int position = 0; position < cuboidBitSet.length; position++) {
+                    hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
+                }
+                counter.add(hc.hash().asBytes());
+            }
+        }
+        return result;
+    }
+
+    private void commitSegment(CubeSegment cubeSegment) throws IOException {
+        cubeSegment.setStatus(SegmentStatusEnum.READY);
+        CubeManager.getInstance(kylinConfig).updateCube(cubeSegment.getCubeInstance(), true);
+    }
+
+    private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        List<Long> result = Lists.newArrayList();
+        CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+        getSubCuboidIds(cuboidScheduler, baseCuboidId, result);
+        return result;
+    }
+
+    private void getSubCuboidIds(CuboidScheduler cuboidScheduler, long parentCuboidId, List<Long> result) {
+        result.add(parentCuboidId);
+        for (Long cuboidId: cuboidScheduler.getSpanningCuboid(parentCuboidId)) {
+            getSubCuboidIds(cuboidScheduler, cuboidId, result);
+        }
+    }
+
+
+    private List<List<String>> parseStream(List<StreamMessage> streamMessages) {
+        return Lists.transform(streamMessages, new Function<StreamMessage, List<String>>() {
+            @Nullable
+            @Override
+            public List<String> apply(StreamMessage input) {
+                return getStreamParser().parse(input);
+            }
+        });
+    }
+
+    private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
+        final String hTableName = cubeSegment.getStorageLocationIdentifier();
+        String[] args = new String[]{"-cubename", cubeName,
+                "-segmentname", cubeSegment.getName(),
+                "-input", "/empty",
+                "-htablename", hTableName,
+                "-statisticsenabled", "true"};
+        ToolRunner.run(new CreateHTableJob(), args);
+        final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+        logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
+        return hTable;
+    }
+
+    private void loadToHTable(String hTableName) throws IOException {
+        final HTableInterface table = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+
+    }
+
+    @Override
+    protected void onStop() {
+
+    }
+
+    @Override
+    protected int batchInterval() {
+        return 30 * 60 * 1000;//30 min
+    }
+
+    @Override
+    protected int batchSize() {
+        return 1000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 7854fd5..3929098 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -35,6 +35,7 @@
 package org.apache.kylin.job.streaming;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
@@ -42,20 +43,22 @@ import kafka.javaapi.PartitionMetadata;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.streaming.*;
 import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 
 /**
  */
@@ -65,14 +68,12 @@ public class StreamingBootstrap {
 
     private KylinConfig kylinConfig;
     private StreamingManager streamingManager;
-    private IIManager iiManager;
 
     private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
 
     private StreamingBootstrap(KylinConfig kylinConfig) {
         this.kylinConfig = kylinConfig;
         this.streamingManager = StreamingManager.getInstance(kylinConfig);
-        this.iiManager = IIManager.getInstance(kylinConfig);
     }
 
     public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
@@ -115,10 +116,73 @@ public class StreamingBootstrap {
     public void start(String streaming, int partitionId) throws Exception {
         final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming);
         Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
-        final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
-        Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
+
         final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size();
         Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
+
+        if (!StringUtils.isEmpty(kafkaConfig.getIiName())) {
+            startIIStreaming(kafkaConfig, partitionId, partitionCount);
+        } else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) {
+            startCubeStreaming(kafkaConfig, partitionId, partitionCount);
+        } else {
+            throw new IllegalArgumentException("no cube or ii in kafka config");
+        }
+    }
+
+    private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
+        List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
+        for (int partitionId = 0 ; partitionId < partitionCount && partitionId < 10; ++partitionId) {
+            final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+            long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
+            final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
+            streamingOffset = Math.max(streamingOffset, latestOffset);
+            KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
+                    streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+            Executors.newSingleThreadExecutor().submit(consumer);
+            result.add(consumer.getStreamQueue(0));
+        }
+        return result;
+    }
+
+    private void startCubeStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
+        final String cubeName = kafkaConfig.getCubeName();
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+        final List<BlockingQueue<StreamMessage>> queues = consume(kafkaConfig, partitionCount);
+        final LinkedBlockingDeque<StreamMessage> streamQueue = new LinkedBlockingDeque<>();
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    for (BlockingQueue<StreamMessage> queue : queues) {
+                        try {
+                            streamQueue.put(queue.take());
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }
+        });
+        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
+        cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+        future.get();
+    }
+
+    private StreamParser getStreamParser(KafkaConfig kafkaConfig, List<TblColRef> columns) throws Exception {
+        if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
+            Class clazz = Class.forName(kafkaConfig.getParserName());
+            Constructor constructor = clazz.getConstructor(List.class);
+            return (StreamParser) constructor.newInstance(columns);
+        } else {
+            return new JsonStreamParser(columns);
+        }
+    }
+
+    private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
+        final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
+        Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
         Preconditions.checkArgument(ii.getSegments().size() > 0);
         final IISegment iiSegment = ii.getSegments().get(0);
 
@@ -129,7 +193,8 @@ public class StreamingBootstrap {
         final int parallelism = shard / partitionCount;
         final int startShard = partitionId * parallelism;
         final int endShard = startShard + parallelism;
-        long streamingOffset = getEarliestStreamingOffset(streaming, startShard, endShard);
+
+        long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), startShard, endShard);
         streamingOffset = streamingOffset - (streamingOffset % parallelism);
         logger.info("offset from ii desc is " + streamingOffset);
         final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
@@ -143,21 +208,14 @@ public class StreamingBootstrap {
         }
 
         KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
-        kafkaConsumers.put(getKey(streaming, partitionId), consumer);
+        kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
+
 
-        StreamParser parser;
-        if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
-            Class clazz = Class.forName(kafkaConfig.getParserName());
-            Constructor constructor = clazz.getConstructor(List.class);
-            parser = (StreamParser) constructor.newInstance(ii.getDescriptor().listAllColumns());
-        } else {
-            parser = new JsonStreamParser(ii.getDescriptor().listAllColumns());
-        }
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streaming, iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
-            task.setStreamParser(parser);
+            final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
+            task.setStreamParser(getStreamParser(kafkaConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index a437bba..319d7fa 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -55,11 +55,11 @@ import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter;
-import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
index 2432220..71c9644 100644
--- a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
@@ -34,19 +34,19 @@
 
 package org.apache.kylin.job;
 
-import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
 import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.junit.*;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 
 /**
  */
@@ -63,30 +63,6 @@ public class ITKafkaBasedIIStreamBuilderTest {
         System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
     }
 
-    @AfterClass
-    public static void afterClass() throws Exception {
-//        backup();
-    }
-
-    private static void backup() throws Exception {
-        int exitCode = cleanupOldStorage();
-        if (exitCode == 0) {
-            exportHBaseData();
-        }
-    }
-
-    private static int cleanupOldStorage() throws Exception {
-        String[] args = {"--delete", "true"};
-
-        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
-        return exitCode;
-    }
-
-    private static void exportHBaseData() throws IOException {
-        ExportHBaseData export = new ExportHBaseData();
-        export.exportTables();
-    }
-
     @Before
     public void before() throws Exception {
         HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
index c4bda5f..781273d 100644
--- a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java
@@ -38,10 +38,10 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.lookup.FileTableReader;
-import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter;
-import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.cube.IGTRecordWriter;
+import org.apache.kylin.streaming.cube.InMemCubeBuilder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
deleted file mode 100644
index f61aa66..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- */
-@Ignore
-public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
-
-    private static final int BENCHMARK_RECORD_LIMIT = 2000000;
-    private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
-
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
-        final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-        final CubeDesc desc = cubeSegment.getCubeDesc();
-        for (DimensionDesc dim : desc.getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (desc.getRowkey().isUseDictionary(col)) {
-                    Dictionary dict = cubeSegment.getDictionary(col);
-                    if (dict == null) {
-                        throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
-                    }
-                    logger.info("Dictionary for " + col + " was put into dictionary map.");
-                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
-                }
-            }
-        }
-        return dictionaryMap;
-    }
-
-    private static class ConsoleGTRecordWriter implements IGTRecordWriter {
-
-        boolean verbose = false;
-
-        @Override
-        public void write(Long cuboidId, GTRecord record) throws IOException {
-            if (verbose)
-                System.out.println(record.toString());
-        }
-    }
-
-    private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
-        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
-        String line;
-        int counter = 0;
-        while ((line = br.readLine()) != null) {
-            queue.put(Arrays.asList(line.split("\t")));
-            counter++;
-            if (counter == BENCHMARK_RECORD_LIMIT) {
-                break;
-            }
-        }
-        queue.put(Collections.emptyList());
-    }
-
-    private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
-        queue.put(Collections.emptyList());
-    }
-
-
-    @Test
-    public void test() throws Exception {
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-        final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
-        final CubeSegment cubeSegment = cube.getFirstSegment();
-
-        LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
-
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cubeBuilder);
-        loadDataFromLocalFile(queue);
-        future.get();
-        logger.info("stream build finished");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
new file mode 100644
index 0000000..9d62fda
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
@@ -0,0 +1,76 @@
+package org.apache.kylin.job.streaming;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.streaming.StreamMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ */
+public class CubeStreamBuilderTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilderTest.class);
+
+    private KylinConfig kylinConfig;
+
+    private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
+        cube.getSegments().clear();
+        CubeManager.getInstance(kylinConfig).updateCube(cube, true);
+
+    }
+
+    @Test
+    public void test() throws Exception {
+        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
+        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, CUBE_NAME);
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+        loadDataFromLocalFile(queue, 100000);
+        future.get();
+    }
+
+    private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
+        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+        String line;
+        int count = 0;
+        while ((line = br.readLine()) != null && count++ < maxCount) {
+            final List<String> strings = Arrays.asList(line.split("\t"));
+            queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
+        }
+        queue.put(StreamMessage.EOF);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index f880b73..0d720ab 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -29,6 +29,7 @@ import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
 import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
+import org.apache.kylin.storage.hbase.CubeStorageEngine;
 import org.apache.kylin.storage.hbase.InvertedIndexStorageEngine;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.apache.kylin.storage.hybrid.HybridStorageEngine;
@@ -51,7 +52,7 @@ public class StorageEngineFactory {
                 return ret;
             }
         } else if (realization.getType() == RealizationType.CUBE) {
-            ICachableStorageEngine ret = new org.apache.kylin.storage.cube.CubeStorageEngine((CubeInstance) realization);
+            ICachableStorageEngine ret = new CubeStorageEngine((CubeInstance) realization);
             if (allowStorageLayerCache) {
                 return wrapWithCache(ret, realization);
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index 9949c96..b6f5025 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -75,6 +75,9 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("iiName")
     private String iiName;
 
+    @JsonProperty("cubeName")
+    private String cubeName;
+
     @JsonProperty("parserName")
     private String parserName;
 
@@ -86,7 +89,6 @@ public class KafkaConfig extends RootPersistentEntity {
         this.parserName = parserName;
     }
 
-
     public int getTimeout() {
         return timeout;
     }
@@ -133,6 +135,14 @@ public class KafkaConfig extends RootPersistentEntity {
         });
     }
 
+    public String getCubeName() {
+        return cubeName;
+    }
+
+    public void setCubeName(String cubeName) {
+        this.cubeName = cubeName;
+    }
+
     public String getIiName() {
         return iiName;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
new file mode 100644
index 0000000..5dab7f9
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
@@ -0,0 +1,100 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public final class SEOJsonStreamParser implements StreamParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(SEOJsonStreamParser.class);
+
+    private final List<TblColRef> allColumns;
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+    public SEOJsonStreamParser(List<TblColRef> allColumns) {
+        this.allColumns = allColumns;
+    }
+
+    @Override
+    public List<String> parse(StreamMessage stream) {
+        try {
+            Map<String, String> root = mapper.readValue(stream.getRawData(), mapType);
+            String trafficSource = root.get("trafficsourceid");
+            if ("20".equals(trafficSource) || "21".equals(trafficSource) || "22".equals(trafficSource) || "23".equals(trafficSource)) {
+                ArrayList<String> result = Lists.newArrayList();
+                for (TblColRef column : allColumns) {
+                    String columnName = column.getName();
+                    if (columnName.equalsIgnoreCase("minute_start")) {
+                        result.add(String.valueOf(TimeUtil.getMinuteStart(Long.valueOf(root.get("timestamp")))));
+                    } else if (columnName.equalsIgnoreCase("hour_start")) {
+                        result.add(String.valueOf(TimeUtil.getHourStart(Long.valueOf(root.get("timestamp")))));
+                    } else if (columnName.equalsIgnoreCase("day")) {
+                        //of day start we'll add yyyy-mm-dd
+                        long ts = TimeUtil.getDayStart(Long.valueOf(root.get("timestamp")));
+                        result.add(DateFormat.formatToDateStr(ts));
+                    } else {
+                        String x = root.get(columnName.toLowerCase());
+                        result.add(x);
+                    }
+                }
+
+                return result;
+            } else {
+                return null;
+            }
+        } catch (IOException e) {
+            logger.error("error parsing:" + new String(stream.getRawData()), e);
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 67cb109..e9cb046 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -48,16 +48,14 @@ public abstract class StreamBuilder implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
 
-    private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000;
-    private final int sliceSize;
+
     private StreamParser streamParser = StringStreamParser.instance;
 
     private BlockingQueue<StreamMessage> streamMessageQueue;
     private long lastBuildTime = System.currentTimeMillis();
 
-    public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, int sliceSize) {
+    public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue) {
         this.streamMessageQueue = streamMessageQueue;
-        this.sliceSize = sliceSize;
     }
 
     protected abstract void build(List<StreamMessage> streamsToBuild) throws Exception;
@@ -81,10 +79,11 @@ public abstract class StreamBuilder implements Runnable {
                     logger.warn("stream queue interrupted", e);
                     continue;
                 }
-                if (streamMessage == null) {
-
-                    logger.info("The stream queue is drained, current available stream count: " + streamMessageToBuild.size());
-                    if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
+                if (streamMessage == null || getStreamParser().parse(streamMessage) == null) {
+                    if (streamMessage == null) {
+                        logger.info("The stream queue is drained, current available stream count: " + streamMessageToBuild.size());
+                    }
+                    if ((System.currentTimeMillis() - lastBuildTime) > batchInterval()) {
                         build(streamMessageToBuild);
                         clearCounter();
                         streamMessageToBuild.clear();
@@ -98,7 +97,7 @@ public abstract class StreamBuilder implements Runnable {
                     }
                 }
                 streamMessageToBuild.add(streamMessage);
-                if (streamMessageToBuild.size() >= this.sliceSize) {
+                if (streamMessageToBuild.size() >= batchSize()) {
                     build(streamMessageToBuild);
                     clearCounter();
                     streamMessageToBuild.clear();
@@ -117,4 +116,7 @@ public abstract class StreamBuilder implements Runnable {
     public final void setStreamParser(StreamParser streamParser) {
         this.streamParser = streamParser;
     }
+
+    protected abstract int batchInterval();
+    protected abstract int batchSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
new file mode 100644
index 0000000..2d1e97e
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
@@ -0,0 +1,11 @@
+package org.apache.kylin.streaming.cube;
+
+import org.apache.kylin.storage.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public interface IGTRecordWriter {
+    void write(Long cuboidId, GTRecord record) throws IOException;
+}