You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/03 12:04:25 UTC

[1/2] incubator-kylin git commit: KYLIN-808

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 30acdb901 -> 3eee24f94


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamConsumer.java
new file mode 100644
index 0000000..421daef
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamConsumer.java
@@ -0,0 +1,148 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.invertedindex;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.MicroStreamBatchConsumer;
+import org.apache.kylin.streaming.StreamingManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class IIStreamConsumer implements MicroStreamBatchConsumer {
+
+    private static Logger logger = LoggerFactory.getLogger(IIStreamConsumer.class);
+
+    private final IIDesc desc;
+    private final HTableInterface hTable;
+    private final SliceBuilder sliceBuilder;
+    private final int shardId;
+    private final String streaming;
+    private StreamingManager streamingManager;
+
+    public IIStreamConsumer(String streaming, String hTableName, IIDesc iiDesc, int shard) {
+        this.streaming = streaming;
+        this.desc = iiDesc;
+        this.shardId = shard;
+        try {
+            this.hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+            this.hTable.setAutoFlushTo(true);
+        } catch (IOException e) {
+            logger.error("cannot open htable name:" + hTableName, e);
+            throw new RuntimeException("cannot open htable name:" + hTableName, e);
+        }
+        this.sliceBuilder = new SliceBuilder(desc, (short) shard, iiDesc.isUseLocalDictionary());
+        this.streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
+    }
+
+    @Override
+    public void consume(MicroStreamBatch microStreamBatch) throws IOException {
+        if (microStreamBatch.size() > 0) {
+            long offset = microStreamBatch.getOffset().getFirst();
+            if (offset < streamingManager.getOffset(streaming, shardId)) {
+                logger.info("this batch has already been built, skip building");
+                return;
+            }
+            logger.info("stream build start, size:" + microStreamBatch.size());
+            Stopwatch stopwatch = new Stopwatch();
+            stopwatch.start();
+            final Slice slice = sliceBuilder.buildSlice(microStreamBatch);
+            logger.info("slice info, shard:" + slice.getShard() + " timestamp:" + slice.getTimestamp() + " record count:" + slice.getRecordCount());
+
+            loadToHBase(hTable, slice, new IIKeyValueCodec(slice.getInfo()));
+            submitOffset(offset);
+            stopwatch.stop();
+            logger.info("stream build finished, size:" + microStreamBatch.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " " + TimeUnit.MILLISECONDS);
+        } else {
+            logger.info("nothing to build, skip building");
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            this.hTable.close();
+        } catch (IOException e) {
+            logger.error("onStop throw exception", e);
+        }
+    }
+
+    private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
+        List<Put> data = Lists.newArrayList();
+        for (IIRow row : codec.encodeKeyValue(slice)) {
+            final byte[] key = row.getKey().get();
+            final byte[] value = row.getValue().get();
+            Put put = new Put(key);
+            put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
+            final ImmutableBytesWritable dictionary = row.getDictionary();
+            final byte[] dictBytes = dictionary.get();
+            if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) {
+                put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
+            } else {
+                throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength());
+            }
+            data.add(put);
+        }
+        hTable.put(data);
+        //omit hTable.flushCommits(), because htable is auotflush
+    }
+
+    private void submitOffset(long offset) {
+        try {
+            streamingManager.updateOffset(streaming, shardId, offset);
+            logger.info("submit offset:" + offset);
+        } catch (Exception e) {
+            logger.warn("error submit offset: " + offset + " retrying", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+}


[2/2] incubator-kylin git commit: KYLIN-808

Posted by qh...@apache.org.
KYLIN-808


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3eee24f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3eee24f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3eee24f9

Branch: refs/heads/0.8.0
Commit: 3eee24f945b2a770d6acb9b60113a21ccaba83fc
Parents: 30acdb9
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Jun 2 16:48:23 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jun 3 18:01:33 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/CubeStreamBuilder.java  | 383 -------------------
 .../kylin/job/streaming/CubeStreamConsumer.java | 370 ++++++++++++++++++
 .../kylin/job/streaming/StreamingBootstrap.java |  37 +-
 .../apache/kylin/job/BuildIIWithStreamTest.java |   6 +-
 .../kylin/job/hadoop/invertedindex/IITest.java  |  35 +-
 .../job/streaming/CubeStreamBuilderTest.java    |  81 ----
 .../job/streaming/CubeStreamConsumerTest.java   |  83 ++++
 .../kylin/streaming/JsonStreamParser.java       |   2 +
 .../kylin/streaming/MicroBatchCondition.java    |  22 ++
 .../kylin/streaming/MicroStreamBatch.java       |  45 ++-
 .../streaming/MicroStreamBatchConsumer.java     |  11 +
 .../apache/kylin/streaming/StreamBuilder.java   | 199 ++++++----
 .../invertedindex/IIStreamBuilder.java          | 164 --------
 .../invertedindex/IIStreamConsumer.java         | 148 +++++++
 14 files changed, 828 insertions(+), 758 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
deleted file mode 100644
index 6914b73..0000000
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ /dev/null
@@ -1,383 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.TableSignature;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
-import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
-import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil;
-import org.apache.kylin.job.inmemcubing.ICuboidWriter;
-import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- */
-public class CubeStreamBuilder extends StreamBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilder.class);
-
-    private final CubeManager cubeManager;
-    private final String cubeName;
-    private final KylinConfig kylinConfig;
-    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
-
-
-    public CubeStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, String cubeName) {
-        super(streamMessageQueue);
-        this.kylinConfig = KylinConfig.getInstanceFromEnv();
-        this.cubeManager = CubeManager.getInstance(kylinConfig);
-        this.cubeName = cubeName;
-    }
-
-    @Override
-    protected void build(MicroStreamBatch microStreamBatch) throws Exception {
-        if (microStreamBatch.size() == 0) {
-            logger.info("nothing to build, skip to next iteration");
-            return;
-        }
-        final List<List<String>> parsedStreamMessages = microStreamBatch.getStreams();
-        long startOffset = microStreamBatch.getOffset().getFirst();
-        long endOffset = microStreamBatch.getOffset().getSecond();
-        LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
-        blockingQueue.put(Collections.<String>emptyList());
-
-        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis(), false, false);
-        final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
-
-        final Configuration conf = HadoopUtil.getCurrentConfiguration();
-        final Path outputPath = new Path("file:///tmp/cuboidstatistics/" + UUID.randomUUID().toString());
-        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
-        ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
-
-        final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
-        writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
-
-        final HTableInterface hTable = createHTable(cubeSegment);
-
-        final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
-        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(),
-                dictionaryMap, gtRecordWriter);
-
-        executorService.submit(inMemCubeBuilder).get();
-        gtRecordWriter.flush();
-        commitSegment(cubeSegment);
-    }
-
-    private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {
-        for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) {
-            final TblColRef tblColRef = entry.getKey();
-            final Dictionary<?> dictionary = entry.getValue();
-            TableSignature signature = new TableSignature();
-            signature.setLastModifiedTime(System.currentTimeMillis());
-            signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
-            signature.setSize(endOffset - startOffset);
-            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(),
-                    tblColRef.getName(),
-                    tblColRef.getColumnDesc().getZeroBasedIndex(),
-                    tblColRef.getDatatype(),
-                    signature,
-                    ReadableTable.DELIM_AUTO);
-            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
-            DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
-            try {
-                cubeSegment.putDictResPath(tblColRef, dictionaryManager.trySaveNewDict(dictionary, dictInfo).getResourcePath());
-            } catch (IOException e) {
-                logger.error("error save dictionary for column:" + tblColRef, e);
-                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
-            }
-        }
-    }
-
-    private class CubeStreamRecordWriter implements ICuboidWriter {
-        final List<InMemKeyValueCreator> keyValueCreators;
-        final int nColumns;
-        final HTableInterface hTable;
-        private final ByteBuffer byteBuffer;
-        private final CubeDesc cubeDesc;
-        private List<Put> puts = Lists.newArrayList();
-
-        private CubeStreamRecordWriter(CubeDesc cubeDesc, HTableInterface hTable) {
-            this.keyValueCreators = Lists.newArrayList();
-            this.cubeDesc = cubeDesc;
-            int startPosition = 0;
-            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                    keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
-                    startPosition += colDesc.getMeasures().length;
-                }
-            }
-            this.nColumns = keyValueCreators.size();
-            this.hTable = hTable;
-            this.byteBuffer = ByteBuffer.allocate(1<<20);
-        }
-
-        private byte[] copy(byte[] array, int offset, int length) {
-            byte[] result = new byte[length];
-            System.arraycopy(array, offset, result, 0, length);
-            return result;
-        }
-
-        private ByteBuffer createKey(Long cuboidId, GTRecord record) {
-            byteBuffer.clear();
-            byteBuffer.put(Bytes.toBytes(cuboidId));
-            final int cardinality = BitSet.valueOf(new long[]{cuboidId}).cardinality();
-            for (int i = 0; i < cardinality; i++) {
-                final ByteArray byteArray = record.get(i);
-                byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length());
-            }
-            return byteBuffer;
-        }
-
-        @Override
-        public void write(long cuboidId, GTRecord record) throws IOException {
-            final ByteBuffer key = createKey(cuboidId, record);
-            final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
-            final ImmutableBitSet bitSet = new ImmutableBitSet(mapping.getDimensionCount(), mapping.getColumnCount());
-            for (int i = 0; i < nColumns; i++) {
-                final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
-                final Put put = new Put(copy(key.array(), 0, key.position()));
-                byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
-                byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
-                byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
-                put.add(family, qualifier, value);
-                puts.add(put);
-            }
-            if (puts.size() >= batchSize()) {
-                flush();
-            }
-        }
-
-        public final void flush() {
-            try {
-                if (!puts.isEmpty()) {
-                    long t = System.currentTimeMillis();
-                    if (hTable != null) {
-                        hTable.put(puts);
-                        hTable.flushCommits();
-                    }
-                    logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
-                    puts.clear();
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private Map<Integer, TblColRef> getTblColRefMap(CubeInstance cubeInstance) {
-        final List<TblColRef> columns = cubeInstance.getAllColumns();
-        final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
-        final HashMap<Integer, TblColRef> result = Maps.newHashMap();
-        for (int i = 0; i < columns.size(); i++) {
-            final TblColRef tblColRef = columns.get(i);
-            if (allDimensions.contains(tblColRef)) {
-                result.put(i, tblColRef);
-            }
-        }
-        return result;
-    }
-
-    private Map<TblColRef, Dictionary<?>> buildDictionary(final Map<Integer, TblColRef> tblColRefMap, List<List<String>> recordList) throws IOException {
-        HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
-
-        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
-        for (List<String> row : recordList) {
-            for (int i = 0; i < row.size(); i++) {
-                String cell = row.get(i);
-                if (tblColRefMap.containsKey(i)) {
-                    valueMap.put(tblColRefMap.get(i), cell);
-                }
-            }
-        }
-        for (TblColRef tblColRef : valueMap.keySet()) {
-            final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
-                @Nullable
-                @Override
-                public byte[] apply(String input) {
-                    return input == null ? null : input.getBytes();
-                }
-            });
-            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
-            result.put(tblColRef, dict);
-        }
-        return result;
-    }
-
-    private Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, List<List<String>> streams) {
-        CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
-        final List<Long> allCuboidIds = getAllCuboidIds(cubeDesc);
-        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
-
-
-        Lists.transform(allCuboidIds, new Function<Long, Integer[]>() {
-            @Nullable
-            @Override
-            public Integer[] apply(@Nullable Long cuboidId) {
-                BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
-                Integer[] result = new Integer[bitSet.cardinality()];
-
-                long mask = Long.highestOneBit(baseCuboidId);
-                int position = 0;
-                for (int i = 0; i < rowkeyLength; i++) {
-                    if ((mask & cuboidId) > 0) {
-                        result[position] = i;
-                        position++;
-                    }
-                    mask = mask >> 1;
-                }
-                return result;
-            }
-        });
-        final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
-        for (Long cuboidId : allCuboidIds) {
-            result.put(cuboidId, new HyperLogLogPlusCounter(14));
-            BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
-            Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
-
-            long mask = Long.highestOneBit(baseCuboidId);
-            int position = 0;
-            for (int i = 0; i < rowkeyLength; i++) {
-                if ((mask & cuboidId) > 0) {
-                    cuboidBitSet[position] = i;
-                    position++;
-                }
-                mask = mask >> 1;
-            }
-            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
-        }
-
-        HashFunction hf = Hashing.murmur3_32();
-        ByteArray[] row_hashcodes = new ByteArray[rowkeyLength];
-        for (int i = 0; i < rowkeyLength; i++) {
-            row_hashcodes[i] = new ByteArray();
-        }
-        for (List<String> row : streams) {
-            //generate hash for each row key column
-            for (int i = 0; i < rowkeyLength; i++) {
-                Hasher hc = hf.newHasher();
-                final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
-                if (cell != null) {
-                    row_hashcodes[i].set(hc.putString(cell).hash().asBytes());
-                } else {
-                    row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
-                }
-            }
-
-            for (Map.Entry<Long, HyperLogLogPlusCounter> longHyperLogLogPlusCounterEntry : result.entrySet()) {
-                Long cuboidId = longHyperLogLogPlusCounterEntry.getKey();
-                HyperLogLogPlusCounter counter = longHyperLogLogPlusCounterEntry.getValue();
-                Hasher hc = hf.newHasher();
-                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
-                for (int position = 0; position < cuboidBitSet.length; position++) {
-                    hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
-                }
-                counter.add(hc.hash().asBytes());
-            }
-        }
-        return result;
-    }
-
-    //TODO: should we use cubeManager.promoteNewlyBuiltSegments?
-    private void commitSegment(CubeSegment cubeSegment) throws IOException {
-        cubeSegment.setStatus(SegmentStatusEnum.READY);
-        CubeBuilder cubeBuilder = new CubeBuilder(cubeSegment.getCubeInstance());
-        cubeBuilder.setToAddSegs(cubeSegment);
-        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-    }
-
-    private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
-        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        List<Long> result = Lists.newArrayList();
-        CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
-        getSubCuboidIds(cuboidScheduler, baseCuboidId, result);
-        return result;
-    }
-
-    private void getSubCuboidIds(CuboidScheduler cuboidScheduler, long parentCuboidId, List<Long> result) {
-        result.add(parentCuboidId);
-        for (Long cuboidId: cuboidScheduler.getSpanningCuboid(parentCuboidId)) {
-            getSubCuboidIds(cuboidScheduler, cuboidId, result);
-        }
-    }
-
-
-    private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
-        final String hTableName = cubeSegment.getStorageLocationIdentifier();
-        CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);
-        final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
-        logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
-        return hTable;
-    }
-
-    @Override
-    protected void onStop() {
-
-    }
-
-    @Override
-    protected int batchInterval() {
-        return 5 * 60 * 1000;//5 min
-    }
-
-    @Override
-    protected int batchSize() {
-        return 1000;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
new file mode 100644
index 0000000..a229c65
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -0,0 +1,370 @@
+package org.apache.kylin.job.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ReadableTable;
+import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
+import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
+import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.MicroStreamBatchConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class CubeStreamConsumer implements MicroStreamBatchConsumer {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumer.class);
+
+    private final CubeManager cubeManager;
+    private final String cubeName;
+    private final KylinConfig kylinConfig;
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    private static final int BATCH_PUT_THRESHOLD = 10000;
+
+
+    public CubeStreamConsumer(String cubeName) {
+        this.kylinConfig = KylinConfig.getInstanceFromEnv();
+        this.cubeManager = CubeManager.getInstance(kylinConfig);
+        this.cubeName = cubeName;
+    }
+
+    @Override
+    public void consume(MicroStreamBatch microStreamBatch) throws Exception {
+        if (microStreamBatch.size() == 0) {
+            logger.info("nothing to build, skip to next iteration");
+            return;
+        }
+        final List<List<String>> parsedStreamMessages = microStreamBatch.getStreams();
+        long startOffset = microStreamBatch.getOffset().getFirst();
+        long endOffset = microStreamBatch.getOffset().getSecond();
+        LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
+        blockingQueue.put(Collections.<String>emptyList());
+
+        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
+        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis(), false, false);
+        final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+
+        final Configuration conf = HadoopUtil.getCurrentConfiguration();
+        final Path outputPath = new Path("file:///tmp/cuboidstatistics/" + UUID.randomUUID().toString());
+        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
+        ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
+
+        final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(cubeInstance, parsedStreamMessages);
+        writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+
+        final HTableInterface hTable = createHTable(cubeSegment);
+
+        final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
+        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(),
+                dictionaryMap, gtRecordWriter);
+
+        executorService.submit(inMemCubeBuilder).get();
+        gtRecordWriter.flush();
+        commitSegment(cubeSegment);
+    }
+
+    private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {
+        for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) {
+            final TblColRef tblColRef = entry.getKey();
+            final Dictionary<?> dictionary = entry.getValue();
+            TableSignature signature = new TableSignature();
+            signature.setLastModifiedTime(System.currentTimeMillis());
+            signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
+            signature.setSize(endOffset - startOffset);
+            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(),
+                    tblColRef.getName(),
+                    tblColRef.getColumnDesc().getZeroBasedIndex(),
+                    tblColRef.getDatatype(),
+                    signature,
+                    ReadableTable.DELIM_AUTO);
+            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
+            DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
+            try {
+                cubeSegment.putDictResPath(tblColRef, dictionaryManager.trySaveNewDict(dictionary, dictInfo).getResourcePath());
+            } catch (IOException e) {
+                logger.error("error save dictionary for column:" + tblColRef, e);
+                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
+            }
+        }
+    }
+
+    private class CubeStreamRecordWriter implements ICuboidWriter {
+        final List<InMemKeyValueCreator> keyValueCreators;
+        final int nColumns;
+        final HTableInterface hTable;
+        private final ByteBuffer byteBuffer;
+        private final CubeDesc cubeDesc;
+        private List<Put> puts = Lists.newArrayList();
+
+        private CubeStreamRecordWriter(CubeDesc cubeDesc, HTableInterface hTable) {
+            this.keyValueCreators = Lists.newArrayList();
+            this.cubeDesc = cubeDesc;
+            int startPosition = 0;
+            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                    keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
+                    startPosition += colDesc.getMeasures().length;
+                }
+            }
+            this.nColumns = keyValueCreators.size();
+            this.hTable = hTable;
+            this.byteBuffer = ByteBuffer.allocate(1<<20);
+        }
+
+        private byte[] copy(byte[] array, int offset, int length) {
+            byte[] result = new byte[length];
+            System.arraycopy(array, offset, result, 0, length);
+            return result;
+        }
+
+        private ByteBuffer createKey(Long cuboidId, GTRecord record) {
+            byteBuffer.clear();
+            byteBuffer.put(Bytes.toBytes(cuboidId));
+            final int cardinality = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+            for (int i = 0; i < cardinality; i++) {
+                final ByteArray byteArray = record.get(i);
+                byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length());
+            }
+            return byteBuffer;
+        }
+
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+            final ByteBuffer key = createKey(cuboidId, record);
+            final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
+            final ImmutableBitSet bitSet = new ImmutableBitSet(mapping.getDimensionCount(), mapping.getColumnCount());
+            for (int i = 0; i < nColumns; i++) {
+                final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
+                final Put put = new Put(copy(key.array(), 0, key.position()));
+                byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
+                byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
+                byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
+                put.add(family, qualifier, value);
+                puts.add(put);
+            }
+            if (puts.size() >= BATCH_PUT_THRESHOLD) {
+                flush();
+            }
+        }
+
+        public final void flush() {
+            try {
+                if (!puts.isEmpty()) {
+                    long t = System.currentTimeMillis();
+                    if (hTable != null) {
+                        hTable.put(puts);
+                        hTable.flushCommits();
+                    }
+                    logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
+                    puts.clear();
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private Map<TblColRef, Dictionary<?>> buildDictionary(final CubeInstance cubeInstance, List<List<String>> recordList) throws IOException {
+        final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor().listDimensionColumnsExcludingDerived();
+        final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
+        final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+        for (TblColRef column: columnsNeedToBuildDictionary) {
+            final int index = allDimensions.indexOf(column);
+            Preconditions.checkArgument(index >= 0);
+            tblColRefMap.put(index, column);
+        }
+
+        HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+
+        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+        for (List<String> row : recordList) {
+            for (int i = 0; i < row.size(); i++) {
+                String cell = row.get(i);
+                if (tblColRefMap.containsKey(i)) {
+                    valueMap.put(tblColRefMap.get(i), cell);
+                }
+            }
+        }
+        for (TblColRef tblColRef : valueMap.keySet()) {
+            final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+                @Nullable
+                @Override
+                public byte[] apply(String input) {
+                    return input == null ? null : input.getBytes();
+                }
+            });
+            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+            result.put(tblColRef, dict);
+        }
+        return result;
+    }
+
+    private Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, List<List<String>> streams) {
+        CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
+        final List<Long> allCuboidIds = getAllCuboidIds(cubeDesc);
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
+
+
+        Lists.transform(allCuboidIds, new Function<Long, Integer[]>() {
+            @Nullable
+            @Override
+            public Integer[] apply(@Nullable Long cuboidId) {
+                BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+                Integer[] result = new Integer[bitSet.cardinality()];
+
+                long mask = Long.highestOneBit(baseCuboidId);
+                int position = 0;
+                for (int i = 0; i < rowkeyLength; i++) {
+                    if ((mask & cuboidId) > 0) {
+                        result[position] = i;
+                        position++;
+                    }
+                    mask = mask >> 1;
+                }
+                return result;
+            }
+        });
+        final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+        for (Long cuboidId : allCuboidIds) {
+            result.put(cuboidId, new HyperLogLogPlusCounter(14));
+            BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+            Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
+
+            long mask = Long.highestOneBit(baseCuboidId);
+            int position = 0;
+            for (int i = 0; i < rowkeyLength; i++) {
+                if ((mask & cuboidId) > 0) {
+                    cuboidBitSet[position] = i;
+                    position++;
+                }
+                mask = mask >> 1;
+            }
+            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+        }
+
+        HashFunction hf = Hashing.murmur3_32();
+        ByteArray[] row_hashcodes = new ByteArray[rowkeyLength];
+        for (int i = 0; i < rowkeyLength; i++) {
+            row_hashcodes[i] = new ByteArray();
+        }
+        for (List<String> row : streams) {
+            //generate hash for each row key column
+            for (int i = 0; i < rowkeyLength; i++) {
+                Hasher hc = hf.newHasher();
+                final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+                if (cell != null) {
+                    row_hashcodes[i].set(hc.putString(cell).hash().asBytes());
+                } else {
+                    row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+                }
+            }
+
+            for (Map.Entry<Long, HyperLogLogPlusCounter> longHyperLogLogPlusCounterEntry : result.entrySet()) {
+                Long cuboidId = longHyperLogLogPlusCounterEntry.getKey();
+                HyperLogLogPlusCounter counter = longHyperLogLogPlusCounterEntry.getValue();
+                Hasher hc = hf.newHasher();
+                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+                for (int position = 0; position < cuboidBitSet.length; position++) {
+                    hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
+                }
+                counter.add(hc.hash().asBytes());
+            }
+        }
+        return result;
+    }
+
+    //TODO: should we use cubeManager.promoteNewlyBuiltSegments?
+    private void commitSegment(CubeSegment cubeSegment) throws IOException {
+        cubeSegment.setStatus(SegmentStatusEnum.READY);
+        CubeBuilder cubeBuilder = new CubeBuilder(cubeSegment.getCubeInstance());
+        cubeBuilder.setToAddSegs(cubeSegment);
+        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+    }
+
+    private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        List<Long> result = Lists.newArrayList();
+        CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+        getSubCuboidIds(cuboidScheduler, baseCuboidId, result);
+        return result;
+    }
+
+    private void getSubCuboidIds(CuboidScheduler cuboidScheduler, long parentCuboidId, List<Long> result) {
+        result.add(parentCuboidId);
+        for (Long cuboidId: cuboidScheduler.getSpanningCuboid(parentCuboidId)) {
+            getSubCuboidIds(cuboidScheduler, cuboidId, result);
+        }
+    }
+
+
+    private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
+        final String hTableName = cubeSegment.getStorageLocationIdentifier();
+        CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);
+        final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+        logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
+        return hTable;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 36f7dcf..cdade80 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -48,9 +48,10 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.streaming.*;
-import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -144,7 +145,7 @@ public class StreamingBootstrap {
     private void startCubeStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
         List<KafkaClusterConfig> kafkaClusterConfigs = streamingConfig.getKafkaClusterConfigs();
 
-        final List<List<BlockingQueue<StreamMessage>>> allClustersData = Lists.newArrayList();
+        final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
 
         for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
             final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
@@ -152,35 +153,13 @@ public class StreamingBootstrap {
 
             final List<BlockingQueue<StreamMessage>> oneClusterData = consume(kafkaClusterConfig, partitionCount);
             logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
-            allClustersData.add(oneClusterData);
+            allClustersData.addAll(oneClusterData);
         }
 
-        final LinkedBlockingDeque<StreamMessage> alldata = new LinkedBlockingDeque<>();
-        Executors.newSingleThreadExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                int totalMessage = 0;
-                while (true) {
-                    for (List<BlockingQueue<StreamMessage>> oneCluster : allClustersData) {
-                        for (BlockingQueue<StreamMessage> onePartition : oneCluster) {
-                            try {
-                                alldata.put(onePartition.take());
-                                if (totalMessage++ % 10000 == 0) {
-                                    logger.info("Total stream message count: " + totalMessage);
-                                }
-                            } catch (InterruptedException e) {
-                                throw new RuntimeException(e);
-                            }
-                        }
-                    }
-                }
-            }
-        });
-
         final String cubeName = streamingConfig.getCubeName();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
 
-        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(alldata, cubeName);
+        StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, new MicroBatchCondition(Integer.MAX_VALUE, 5 * 60 * 1000), new CubeStreamConsumer(cubeName));
         cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, cubeInstance.getAllColumns()));
         cubeStreamBuilder.setStreamFilter(getStreamFilter(streamingConfig));
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
@@ -246,10 +225,14 @@ public class StreamingBootstrap {
         KafkaConsumer consumer = new KafkaConsumer(kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, parallelism);
         kafkaConsumers.put(getKey(streamingConfig.getName(), partitionId), consumer);
 
+        final IIDesc iiDesc = iiSegment.getIIDesc();
+
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i);
+            final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism),
+                    new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
+                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i));
             task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index a73c5b9..43dc769 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -58,8 +58,10 @@ import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
 import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.streaming.MicroBatchCondition;
+import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -212,7 +214,7 @@ public class BuildIIWithStreamTest {
         ToolRunner.run(new IICreateHTableJob(), args);
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0);
+        final StreamBuilder streamBuilder = new StreamBuilder(queue, new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE), new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0));
 
         List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
         int count = sorted.size();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 7915080..080a2fd 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -63,10 +63,10 @@ public class IITest extends LocalFileMetadataTestCase {
 
     List<IIRow> iiRows;
 
-    final String[] inputData = new String[] { //
-    "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
+    final String[] inputData = new String[]{ //
+            "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
             "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
-            "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
+            "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0"};
 
     @Before
     public void setUp() throws Exception {
@@ -85,34 +85,15 @@ public class IITest extends LocalFileMetadataTestCase {
         List<List<String>> parsedStreamMessages = Lists.newArrayList();
         StreamParser parser = StringStreamParser.instance;
         StreamFilter filter = DefaultStreamFilter.instance;
-        long startOffset = Long.MAX_VALUE;
-        long endOffset = Long.MIN_VALUE;
-        long startTimestamp = Long.MAX_VALUE;
-        long endTimestamp = Long.MIN_VALUE;
 
-        for(StreamMessage message: streamMessages)
-        {
+        MicroStreamBatch batch = new MicroStreamBatch();
+        for (StreamMessage message : streamMessages) {
             ParsedStreamMessage parsedStreamMessage = parser.parse(message);
-            if(filter.filter(parsedStreamMessage))
-            {
-                if (startOffset > parsedStreamMessage.getOffset()) {
-                    startOffset = parsedStreamMessage.getOffset();
-                }
-                if (endOffset < parsedStreamMessage.getOffset()) {
-                    endOffset = parsedStreamMessage.getOffset();
-                }
-                if (startTimestamp > parsedStreamMessage.getTimestamp()) {
-                    startTimestamp = parsedStreamMessage.getTimestamp();
-                }
-                if (endTimestamp < parsedStreamMessage.getTimestamp()) {
-                    endTimestamp = parsedStreamMessage.getTimestamp();
-                }
-                parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
+            if (filter.filter(parsedStreamMessage)) {
+                batch.add(parsedStreamMessage);
             }
         }
 
-        MicroStreamBatch batch = new MicroStreamBatch(parsedStreamMessages, org.apache.kylin.common.util.Pair.newPair(startTimestamp, endTimestamp), org.apache.kylin.common.util.Pair.newPair(startOffset, endOffset));
-
 
         iiRows = Lists.newArrayList();
         final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
@@ -267,7 +248,7 @@ public class IITest extends LocalFileMetadataTestCase {
             @Nullable
             @Override
             public Pair<ImmutableBytesWritable, Result> apply(@Nullable IIRow input) {
-                return new Pair<ImmutableBytesWritable, Result>(new ImmutableBytesWritable(new byte[] { 1 }), Result.create(input.makeCells()));
+                return new Pair<ImmutableBytesWritable, Result>(new ImmutableBytesWritable(new byte[]{1}), Result.create(input.makeCells()));
             }
         })));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
deleted file mode 100644
index 3ef542d..0000000
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.streaming.StreamMessage;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-/**
- */
-@Ignore
-public class CubeStreamBuilderTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilderTest.class);
-
-    private KylinConfig kylinConfig;
-
-    private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
-        CubeBuilder cubeBuilder = new CubeBuilder(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
-        // remove all existing segments
-        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-
-    }
-
-    @Test
-    public void test() throws Exception {
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, CUBE_NAME);
-        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
-        loadDataFromLocalFile(queue, 100000);
-        future.get();
-    }
-
-    private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
-        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
-        String line;
-        int count = 0;
-        while ((line = br.readLine()) != null && count++ < maxCount) {
-            final List<String> strings = Arrays.asList(line.split("\t"));
-            queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
-        }
-        queue.put(StreamMessage.EOF);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
new file mode 100644
index 0000000..8377851
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.job.streaming;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.streaming.MicroBatchCondition;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ */
+@Ignore
+public class CubeStreamConsumerTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
+
+    private KylinConfig kylinConfig;
+
+    private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
+        CubeBuilder cubeBuilder = new CubeBuilder(cube);
+        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+        // remove all existing segments
+        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+
+    }
+
+    @Test
+    public void test() throws Exception {
+        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
+        StreamBuilder cubeStreamBuilder = new StreamBuilder(queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME));
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+        loadDataFromLocalFile(queue, 100000);
+        future.get();
+    }
+
+    private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
+        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+        String line;
+        int count = 0;
+        while ((line = br.readLine()) != null && count++ < maxCount) {
+            final List<String> strings = Arrays.asList(line.split("\t"));
+            queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
+        }
+        queue.put(StreamMessage.EOF);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
index 78de231..1ca881e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
@@ -72,8 +72,10 @@ public final class JsonStreamParser implements StreamParser {
                 for (Map.Entry<String, String> entry : json.entrySet()) {
                     if (entry.getKey().equalsIgnoreCase(column.getName())) {
                         result.add(entry.getValue());
+                        continue;
                     }
                 }
+                result.add(null);
             }
             return new ParsedStreamMessage(result, streamMessage.getOffset(), streamMessage.getOffset());
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
new file mode 100644
index 0000000..baf7b04
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
@@ -0,0 +1,22 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public final class MicroBatchCondition {
+
+    private final int batchSize;
+    private final int batchInterval;
+
+    public MicroBatchCondition(int batchSize, int batchInterval) {
+        this.batchSize = batchSize;
+        this.batchInterval = batchInterval;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public int getBatchInterval() {
+        return batchInterval;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index 0adcee2..268c98c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.streaming;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.Pair;
 
 import java.util.List;
@@ -14,10 +15,16 @@ public final class MicroStreamBatch {
 
     private final Pair<Long, Long> offset;
 
-    public MicroStreamBatch(List<List<String>> streams, Pair<Long, Long> timestamp, Pair<Long, Long> offset) {
-        this.streams = streams;
-        this.timestamp = timestamp;
-        this.offset = offset;
+    public MicroStreamBatch() {
+        this.streams = Lists.newLinkedList();
+        this.timestamp = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
+        this.offset = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
+    }
+
+    private MicroStreamBatch(MicroStreamBatch batch) {
+        this.streams = Lists.newLinkedList(batch.streams);
+        this.timestamp = Pair.newPair(batch.timestamp.getFirst(), batch.timestamp.getSecond());
+        this.offset = Pair.newPair(batch.offset.getFirst(), batch.offset.getSecond());
     }
 
     public final List<List<String>> getStreams() {
@@ -35,4 +42,34 @@ public final class MicroStreamBatch {
     public final int size() {
         return streams.size();
     }
+
+    public final void add(ParsedStreamMessage parsedStreamMessage) {
+        if (offset.getFirst() > parsedStreamMessage.getOffset()) {
+            offset.setFirst(parsedStreamMessage.getOffset());
+        }
+        if (offset.getSecond() < parsedStreamMessage.getOffset()) {
+            offset.setSecond(parsedStreamMessage.getOffset());
+        }
+        if (timestamp.getFirst() > parsedStreamMessage.getTimestamp()) {
+            timestamp.setFirst(parsedStreamMessage.getTimestamp());
+        }
+        if (timestamp.getSecond() < parsedStreamMessage.getTimestamp()) {
+            timestamp.setSecond(parsedStreamMessage.getTimestamp());
+        }
+        this.streams.add(parsedStreamMessage.getStreamMessage());
+    }
+
+    public static MicroStreamBatch union(MicroStreamBatch one, MicroStreamBatch another) {
+        MicroStreamBatch result = new MicroStreamBatch(one);
+        result.streams.addAll(another.streams);
+        result.offset.setFirst(Math.min(result.offset.getFirst(), another.offset.getFirst()));
+        result.offset.setSecond(Math.min(result.offset.getSecond(), another.offset.getSecond()));
+        result.timestamp.setFirst(Math.min(result.timestamp.getFirst(), another.timestamp.getFirst()));
+        result.timestamp.setSecond(Math.min(result.timestamp.getSecond(), another.timestamp.getSecond()));
+        return result;
+    }
+
+    public boolean isEmpty() {
+        return streams.isEmpty();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatchConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatchConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatchConsumer.java
new file mode 100644
index 0000000..37d6076
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatchConsumer.java
@@ -0,0 +1,11 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public interface MicroStreamBatchConsumer {
+
+    void consume(MicroStreamBatch microStreamBatch) throws Exception;
+
+    void stop();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 07b8616..c9d2795 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -34,18 +34,18 @@
 
 package org.apache.kylin.streaming;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  */
-public abstract class StreamBuilder implements Runnable {
+public class StreamBuilder implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
 
@@ -53,97 +53,159 @@ public abstract class StreamBuilder implements Runnable {
 
     private StreamFilter streamFilter = DefaultStreamFilter.instance;
 
-    private BlockingQueue<StreamMessage> streamMessageQueue;
-    private long lastBuildTime = System.currentTimeMillis();
+    private final List<BlockingQueue<StreamMessage>> streamMessageQueues;
 
-    private long startOffset;
-    private long endOffset;
+    private final MicroStreamBatchConsumer consumer;
 
-    private long startTimestamp;
-    private long endTimestamp;
+    private final MicroBatchCondition condition;
 
-    public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue) {
-        this.streamMessageQueue = streamMessageQueue;
+    public StreamBuilder(List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer) {
+        Preconditions.checkArgument(inputs.size() > 0);
+        this.streamMessageQueues = Lists.newArrayList();
+        this.consumer = Preconditions.checkNotNull(consumer);
+        this.condition = condition;
+        init(inputs);
     }
 
-    protected abstract void build(MicroStreamBatch microStreamBatch) throws Exception;
+    public StreamBuilder(BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer) {
+        this.streamMessageQueues = Lists.newArrayList();
+        this.consumer = Preconditions.checkNotNull(consumer);
+        this.condition = condition;
+        init(Preconditions.checkNotNull(input));
+    }
 
-    protected abstract void onStop();
+    private void init(BlockingQueue<StreamMessage> input) {
+        this.streamMessageQueues.add(input);
+    }
 
-    private void clearCounter() {
-        lastBuildTime = System.currentTimeMillis();
-        startOffset = Long.MAX_VALUE;
-        endOffset = Long.MIN_VALUE;
-        startTimestamp = Long.MAX_VALUE;
-        endTimestamp = Long.MIN_VALUE;
+    private void init(List<BlockingQueue<StreamMessage>> inputs) {
+        this.streamMessageQueues.addAll(inputs);
     }
 
     @Override
     public void run() {
         try {
-            List<List<String>> parsedStreamMessages = null;
-            int filteredMsgCount = 0;
+            final int inputCount = streamMessageQueues.size();
+            final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
             while (true) {
-                if (parsedStreamMessages == null) {
-                    parsedStreamMessages = Lists.newLinkedList();
-                    clearCounter();
-                }
-                StreamMessage streamMessage;
-                try {
-                    streamMessage = streamMessageQueue.poll(30, TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
-                    logger.warn("stream queue should not be interrupted", e);
-                    continue;
+                CountDownLatch countDownLatch = new CountDownLatch(inputCount);
+                ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
+                for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
+                    futures.add(executorService.submit(new StreamFetcher(streamMessageQueue, countDownLatch)));
                 }
-                if (streamMessage == null) {
-                    logger.info("The stream queue is drained, current available stream count: " + parsedStreamMessages.size());
-                    if ((System.currentTimeMillis() - lastBuildTime) > batchInterval() && !parsedStreamMessages.isEmpty()) {
-                        logger.info("Building batch due to time threshold, batch size: " + parsedStreamMessages.size());
-                        build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
-                        parsedStreamMessages = null;
+                countDownLatch.await();
+                ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
+                for (Future<MicroStreamBatch> future : futures) {
+                    if (future.get() != null) {
+                        batches.add(future.get());
+                    } else {
+                        //EOF occurs, stop consumer
+                        consumer.stop();
+                        return;
                     }
-                    continue;
                 }
-                if (streamMessage.getOffset() < 0) {
-                    onStop();
-                    logger.warn("streaming encountered EOF, stop building. The remaining {} filtered messages will be discarded", filteredMsgCount);
-                    break;
+                MicroStreamBatch batch = batches.get(0);
+                if (batches.size() > 1) {
+                    for (int i = 1; i < inputCount; i++) {
+                        batch = MicroStreamBatch.union(batch, batches.get(i));
+                    }
                 }
+                consumer.consume(batches.get(0));
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException("stream fetcher thread should not be interrupted", e);
+        } catch (ExecutionException e) {
+            logger.error("stream fetch thread encountered exception", e);
+            throw new RuntimeException("stream fetch thread encountered exception", e);
+        } catch (Exception e) {
+            logger.error("consumer encountered exception", e);
+            throw new RuntimeException("consumer encountered exception", e);
+        }
+    }
 
-                final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
+    private class StreamFetcher implements Callable<MicroStreamBatch> {
 
-                if (getStreamFilter().filter(parsedStreamMessage)) {
+        private final BlockingQueue<StreamMessage> streamMessageQueue;
+        private final CountDownLatch countDownLatch;
+        private long lastBuildTime = System.currentTimeMillis();
+        private long lastBatchTimestamp = -1;
 
-                    if (filteredMsgCount++ % 10000 == 0) {
-                        logger.info("Total filtered stream message count: " + filteredMsgCount);
-                    }
+        public StreamFetcher(BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch) {
+            this.streamMessageQueue = streamMessageQueue;
+            this.countDownLatch = countDownLatch;
+        }
+
+        private void clearCounter() {
+            lastBuildTime = System.currentTimeMillis();
+        }
 
-                    if (startOffset > parsedStreamMessage.getOffset()) {
-                        startOffset = parsedStreamMessage.getOffset();
+        private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) {
+            long t = System.currentTimeMillis();
+            while (true) {
+                final StreamMessage peek = queue.peek();
+                if (peek != null) {
+                    return peek;
+                } else {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        logger.warn("stream queue should not be interrupted", e);
+                        return null;
+                    }
+                    if (System.currentTimeMillis() - t <= timeout) {
+                        break;
                     }
-                    if (endOffset < parsedStreamMessage.getOffset()) {
-                        endOffset = parsedStreamMessage.getOffset();
+                }
+            }
+            return queue.peek();
+        }
+
+        @Override
+        public MicroStreamBatch call() throws Exception {
+            try {
+                MicroStreamBatch microStreamBatch = null;
+                while (true) {
+                    if (microStreamBatch == null) {
+                        microStreamBatch = new MicroStreamBatch();
+                        clearCounter();
                     }
-                    if (startTimestamp > parsedStreamMessage.getTimestamp()) {
-                        startTimestamp = parsedStreamMessage.getTimestamp();
+                    StreamMessage streamMessage = peek(streamMessageQueue, 30000);
+                    if (streamMessage == null) {
+                        logger.info("The stream queue is drained, current available stream count: " + microStreamBatch.size());
+                        if (!microStreamBatch.isEmpty()) {
+                            return microStreamBatch;
+                        } else {
+                            continue;
+                        }
                     }
-                    if (endTimestamp < parsedStreamMessage.getTimestamp()) {
-                        endTimestamp = parsedStreamMessage.getTimestamp();
+                    if (streamMessage.getOffset() < 0) {
+                        consumer.stop();
+                        logger.warn("streaming encountered EOF, stop building");
+                        return null;
                     }
-                    parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
-                    if (parsedStreamMessages.size() >= batchSize()) {
-                        logger.info("Building batch due to size threshold, batch size: " + parsedStreamMessages.size());
-                        build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
-                        parsedStreamMessages = null;
+
+                    final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
+                    if (parsedStreamMessage.getTimestamp() - microStreamBatch.getTimestamp().getFirst() > condition.getBatchInterval()) {
+                        streamMessageQueue.take();
+                        if (getStreamFilter().filter(parsedStreamMessage)) {
+                            if (microStreamBatch.size() >= condition.getBatchSize()) {
+                                return microStreamBatch;
+                            } else {
+                                microStreamBatch.add(parsedStreamMessage);
+                            }
+                        } else {
+                            //ignore unfiltered stream message
+                        }
+                    } else {
+                        return microStreamBatch;
                     }
-                } else {
-                    //ignore unfiltered stream message
                 }
-
+            } catch (Exception e) {
+                logger.error("build stream error, stop building", e);
+                throw new RuntimeException("build stream error, stop building", e);
+            } finally {
+                countDownLatch.countDown();
             }
-        } catch (Exception e) {
-            logger.error("build stream error, stop building", e);
-            throw new RuntimeException("build stream error, stop building", e);
         }
     }
 
@@ -163,7 +225,4 @@ public abstract class StreamBuilder implements Runnable {
         this.streamFilter = streamFilter;
     }
 
-    protected abstract int batchInterval();
-
-    protected abstract int batchSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3eee24f9/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
deleted file mode 100644
index cf86b44..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.invertedindex;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamingManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- */
-public class IIStreamBuilder extends StreamBuilder {
-
-    private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class);
-    private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000;
-
-    private final IIDesc desc;
-    private final HTableInterface hTable;
-    private final SliceBuilder sliceBuilder;
-    private final int shardId;
-    private final String streaming;
-    private final int batchSize;
-    private StreamingManager streamingManager;
-
-    public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard) {
-        super(queue);
-        this.batchSize = iiDesc.getSliceSize();
-        this.streaming = streaming;
-        this.desc = iiDesc;
-        this.shardId = shard;
-        try {
-            this.hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
-            this.hTable.setAutoFlushTo(true);
-        } catch (IOException e) {
-            logger.error("cannot open htable name:" + hTableName, e);
-            throw new RuntimeException("cannot open htable name:" + hTableName, e);
-        }
-        this.sliceBuilder = new SliceBuilder(desc, (short) shard, iiDesc.isUseLocalDictionary());
-        this.streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
-    }
-
-    @Override
-    protected void build(MicroStreamBatch microStreamBatch) throws IOException {
-        if (microStreamBatch.size() > 0) {
-            long offset = microStreamBatch.getOffset().getFirst();
-            if (offset < streamingManager.getOffset(streaming, shardId)) {
-                logger.info("this batch has already been built, skip building");
-                return;
-            }
-            logger.info("stream build start, size:" + microStreamBatch.size());
-            Stopwatch stopwatch = new Stopwatch();
-            stopwatch.start();
-            final Slice slice = sliceBuilder.buildSlice(microStreamBatch);
-            logger.info("slice info, shard:" + slice.getShard() + " timestamp:" + slice.getTimestamp() + " record count:" + slice.getRecordCount());
-
-            loadToHBase(hTable, slice, new IIKeyValueCodec(slice.getInfo()));
-            submitOffset(offset);
-            stopwatch.stop();
-            logger.info("stream build finished, size:" + microStreamBatch.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " " + TimeUnit.MILLISECONDS);
-        } else {
-            logger.info("nothing to build, skip building");
-        }
-    }
-
-    @Override
-    protected void onStop() {
-        try {
-            this.hTable.close();
-        } catch (IOException e) {
-            logger.error("onStop throw exception", e);
-        }
-    }
-
-    @Override
-    protected int batchInterval() {
-        return BATCH_BUILD_INTERVAL_THRESHOLD;
-    }
-
-    @Override
-    protected int batchSize() {
-        return batchSize;
-    }
-
-    private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
-        List<Put> data = Lists.newArrayList();
-        for (IIRow row : codec.encodeKeyValue(slice)) {
-            final byte[] key = row.getKey().get();
-            final byte[] value = row.getValue().get();
-            Put put = new Put(key);
-            put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
-            final ImmutableBytesWritable dictionary = row.getDictionary();
-            final byte[] dictBytes = dictionary.get();
-            if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) {
-                put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
-            } else {
-                throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength());
-            }
-            data.add(put);
-        }
-        hTable.put(data);
-        //omit hTable.flushCommits(), because htable is auotflush
-    }
-
-    private void submitOffset(long offset) {
-        try {
-            streamingManager.updateOffset(streaming, shardId, offset);
-            logger.info("submit offset:" + offset);
-        } catch (Exception e) {
-            logger.warn("error submit offset: " + offset + " retrying", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-}