You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/02 04:36:13 UTC

incubator-kylin git commit: refactor StreamBuilder to support parse and filter

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 d27319b58 -> ad27ae28f


refactor StreamBuilder to support parse and filter


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ad27ae28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ad27ae28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ad27ae28

Branch: refs/heads/0.8.0
Commit: ad27ae28fae82f22c3fa64aac4c0a971b0cc3a55
Parents: d27319b
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Jun 2 00:55:33 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Jun 2 10:36:30 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/CubeStreamBuilder.java  |  29 ++----
 .../kylin/streaming/DefaultStreamFilter.java    |  15 +++
 .../kylin/streaming/JsonStreamParser.java       |   4 +-
 .../kylin/streaming/MicroStreamBatch.java       |  38 +++++++
 .../kylin/streaming/ParsedStreamMessage.java    |  32 ++++++
 .../kylin/streaming/SEOJsonStreamParser.java    | 100 -------------------
 .../apache/kylin/streaming/StreamBuilder.java   |  86 +++++++++++-----
 .../apache/kylin/streaming/StreamFilter.java    |   8 ++
 .../apache/kylin/streaming/StreamParser.java    |   4 +-
 .../kylin/streaming/StringStreamParser.java     |   6 +-
 .../invertedindex/IIStreamBuilder.java          |  13 +--
 .../streaming/invertedindex/SliceBuilder.java   |  14 +--
 12 files changed, 178 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 6678da9..79fd674 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -20,14 +20,9 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.HBaseConnection;
@@ -54,7 +49,6 @@ import org.apache.kylin.dict.lookup.TableSignature;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
 import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
 import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil;
 import org.apache.kylin.job.inmemcubing.ICuboidWriter;
 import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
@@ -62,7 +56,7 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
 import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.SEOJsonStreamParser;
+import org.apache.kylin.streaming.MicroStreamBatch;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamMessage;
 import org.slf4j.Logger;
@@ -94,18 +88,17 @@ public class CubeStreamBuilder extends StreamBuilder {
         this.kylinConfig = KylinConfig.getInstanceFromEnv();
         this.cubeManager = CubeManager.getInstance(kylinConfig);
         this.cubeName = cubeName;
-        setStreamParser(new SEOJsonStreamParser(cubeManager.getCube(cubeName).getAllColumns()));
     }
 
     @Override
-    protected void build(List<StreamMessage> streamMessages) throws Exception {
-        if (CollectionUtils.isEmpty(streamMessages)) {
+    protected void build(MicroStreamBatch microStreamBatch) throws Exception {
+        if (microStreamBatch.size() == 0) {
             logger.info("nothing to build, skip to next iteration");
             return;
         }
-        final List<List<String>> parsedStreamMessages = parseStream(streamMessages);
-        long startOffset = streamMessages.get(0).getOffset();
-        long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset();
+        final List<List<String>> parsedStreamMessages = microStreamBatch.getStreams();
+        long startOffset = microStreamBatch.getOffset().getFirst();
+        long endOffset = microStreamBatch.getOffset().getSecond();
         LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
         blockingQueue.put(Collections.<String>emptyList());
 
@@ -374,16 +367,6 @@ public class CubeStreamBuilder extends StreamBuilder {
     }
 
 
-    private List<List<String>> parseStream(List<StreamMessage> streamMessages) {
-        return Lists.transform(streamMessages, new Function<StreamMessage, List<String>>() {
-            @Nullable
-            @Override
-            public List<String> apply(StreamMessage input) {
-                return getStreamParser().parse(input);
-            }
-        });
-    }
-
     private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
         final String hTableName = cubeSegment.getStorageLocationIdentifier();
         CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/DefaultStreamFilter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/DefaultStreamFilter.java b/streaming/src/main/java/org/apache/kylin/streaming/DefaultStreamFilter.java
new file mode 100644
index 0000000..875ae4f
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/DefaultStreamFilter.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public final class DefaultStreamFilter implements StreamFilter {
+
+    public static final DefaultStreamFilter instance = new DefaultStreamFilter();
+
+    private DefaultStreamFilter() {}
+
+    @Override
+    public boolean filter(ParsedStreamMessage streamMessage) {
+        return streamMessage != null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
index 7d730b7..78de231 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
@@ -64,7 +64,7 @@ public final class JsonStreamParser implements StreamParser {
     }
 
     @Override
-    public List<String> parse(StreamMessage streamMessage) {
+    public ParsedStreamMessage parse(StreamMessage streamMessage) {
         try {
             Map<String, String> json = objectMapper.readValue(streamMessage.getRawData(), javaType);
             ArrayList<String> result = Lists.newArrayList();
@@ -75,7 +75,7 @@ public final class JsonStreamParser implements StreamParser {
                     }
                 }
             }
-            return result;
+            return new ParsedStreamMessage(result, streamMessage.getOffset(), streamMessage.getOffset());
         } catch (IOException e) {
             logger.error("error parsing stream", e);
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
new file mode 100644
index 0000000..0adcee2
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -0,0 +1,38 @@
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.common.util.Pair;
+
+import java.util.List;
+
+/**
+ */
+public final class MicroStreamBatch {
+
+    private final List<List<String>> streams;
+
+    private final Pair<Long, Long> timestamp;
+
+    private final Pair<Long, Long> offset;
+
+    public MicroStreamBatch(List<List<String>> streams, Pair<Long, Long> timestamp, Pair<Long, Long> offset) {
+        this.streams = streams;
+        this.timestamp = timestamp;
+        this.offset = offset;
+    }
+
+    public final List<List<String>> getStreams() {
+        return streams;
+    }
+
+    public final Pair<Long, Long> getTimestamp() {
+        return timestamp;
+    }
+
+    public final Pair<Long, Long> getOffset() {
+        return offset;
+    }
+
+    public final int size() {
+        return streams.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/ParsedStreamMessage.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/ParsedStreamMessage.java b/streaming/src/main/java/org/apache/kylin/streaming/ParsedStreamMessage.java
new file mode 100644
index 0000000..1ebe506
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/ParsedStreamMessage.java
@@ -0,0 +1,32 @@
+package org.apache.kylin.streaming;
+
+import java.util.List;
+
+/**
+ */
+public class ParsedStreamMessage {
+
+    private final List<String> streamMessage;
+
+    private long offset;
+
+    private long timestamp;
+
+    public ParsedStreamMessage(List<String> streamMessage, long offset, long timestamp) {
+        this.streamMessage = streamMessage;
+        this.offset = offset;
+        this.timestamp = timestamp;
+    }
+
+    public final List<String> getStreamMessage() {
+        return streamMessage;
+    }
+
+    public final long getOffset() {
+        return offset;
+    }
+
+    public final long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
deleted file mode 100644
index 5dab7f9..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public final class SEOJsonStreamParser implements StreamParser {
-
-    private static final Logger logger = LoggerFactory.getLogger(SEOJsonStreamParser.class);
-
-    private final List<TblColRef> allColumns;
-    private final ObjectMapper mapper = new ObjectMapper();
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
-
-    public SEOJsonStreamParser(List<TblColRef> allColumns) {
-        this.allColumns = allColumns;
-    }
-
-    @Override
-    public List<String> parse(StreamMessage stream) {
-        try {
-            Map<String, String> root = mapper.readValue(stream.getRawData(), mapType);
-            String trafficSource = root.get("trafficsourceid");
-            if ("20".equals(trafficSource) || "21".equals(trafficSource) || "22".equals(trafficSource) || "23".equals(trafficSource)) {
-                ArrayList<String> result = Lists.newArrayList();
-                for (TblColRef column : allColumns) {
-                    String columnName = column.getName();
-                    if (columnName.equalsIgnoreCase("minute_start")) {
-                        result.add(String.valueOf(TimeUtil.getMinuteStart(Long.valueOf(root.get("timestamp")))));
-                    } else if (columnName.equalsIgnoreCase("hour_start")) {
-                        result.add(String.valueOf(TimeUtil.getHourStart(Long.valueOf(root.get("timestamp")))));
-                    } else if (columnName.equalsIgnoreCase("day")) {
-                        //of day start we'll add yyyy-mm-dd
-                        long ts = TimeUtil.getDayStart(Long.valueOf(root.get("timestamp")));
-                        result.add(DateFormat.formatToDateStr(ts));
-                    } else {
-                        String x = root.get(columnName.toLowerCase());
-                        result.add(x);
-                    }
-                }
-
-                return result;
-            } else {
-                return null;
-            }
-        } catch (IOException e) {
-            logger.error("error parsing:" + new String(stream.getRawData()), e);
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index e9cb046..cb5dc1d 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -35,6 +35,8 @@
 package org.apache.kylin.streaming;
 
 import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kylin.common.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,57 +53,87 @@ public abstract class StreamBuilder implements Runnable {
 
     private StreamParser streamParser = StringStreamParser.instance;
 
+    private StreamFilter streamFilter = DefaultStreamFilter.instance;
+
     private BlockingQueue<StreamMessage> streamMessageQueue;
     private long lastBuildTime = System.currentTimeMillis();
 
+    private long startOffset;
+    private long endOffset;
+
+    private long startTimestamp;
+    private long endTimestamp;
+
     public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue) {
         this.streamMessageQueue = streamMessageQueue;
     }
 
-    protected abstract void build(List<StreamMessage> streamsToBuild) throws Exception;
+    protected abstract void build(MicroStreamBatch microStreamBatch) throws Exception;
 
     protected abstract void onStop();
 
     private void clearCounter() {
         lastBuildTime = System.currentTimeMillis();
+        startOffset = Long.MAX_VALUE;
+        endOffset = Long.MIN_VALUE;
+        startTimestamp = Long.MAX_VALUE;
+        endTimestamp = Long.MIN_VALUE;
     }
 
     @Override
     public void run() {
         try {
-            List<StreamMessage> streamMessageToBuild = Lists.newArrayList();
-            clearCounter();
+            List<List<String>> parsedStreamMessages = null;
             while (true) {
+                if (parsedStreamMessages == null) {
+                    parsedStreamMessages = Lists.newLinkedList();
+                    clearCounter();
+                }
                 StreamMessage streamMessage;
                 try {
                     streamMessage = streamMessageQueue.poll(30, TimeUnit.SECONDS);
                 } catch (InterruptedException e) {
-                    logger.warn("stream queue interrupted", e);
+                    logger.warn("stream queue should not be interrupted", e);
                     continue;
                 }
-                if (streamMessage == null || getStreamParser().parse(streamMessage) == null) {
-                    if (streamMessage == null) {
-                        logger.info("The stream queue is drained, current available stream count: " + streamMessageToBuild.size());
-                    }
-                    if ((System.currentTimeMillis() - lastBuildTime) > batchInterval()) {
-                        build(streamMessageToBuild);
-                        clearCounter();
-                        streamMessageToBuild.clear();
+                if (streamMessage == null) {
+                    logger.info("The stream queue is drained, current available stream count: " + parsedStreamMessages.size());
+                    if ((System.currentTimeMillis() - lastBuildTime) > batchInterval() && !parsedStreamMessages.isEmpty()) {
+                        build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
+                        parsedStreamMessages = null;
                     }
                     continue;
-                } else {
-                    if (streamMessage.getOffset() < 0) {
-                        onStop();
-                        logger.warn("streaming encountered EOF, stop building");
-                        break;
-                    }
                 }
-                streamMessageToBuild.add(streamMessage);
-                if (streamMessageToBuild.size() >= batchSize()) {
-                    build(streamMessageToBuild);
-                    clearCounter();
-                    streamMessageToBuild.clear();
+                if (streamMessage.getOffset() < 0) {
+                    onStop();
+                    logger.warn("streaming encountered EOF, stop building");
+                    break;
                 }
+
+                final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
+
+                if (getStreamFilter().filter(parsedStreamMessage)) {
+                    if (startOffset > parsedStreamMessage.getOffset()) {
+                        startOffset = parsedStreamMessage.getOffset();
+                    }
+                    if (endOffset < parsedStreamMessage.getOffset()) {
+                        endOffset = parsedStreamMessage.getOffset();
+                    }
+                    if (startTimestamp > parsedStreamMessage.getTimestamp()) {
+                        startTimestamp = parsedStreamMessage.getTimestamp();
+                    }
+                    if (endTimestamp < parsedStreamMessage.getTimestamp()) {
+                        endTimestamp = parsedStreamMessage.getTimestamp();
+                    }
+                    parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
+                    if (parsedStreamMessages.size() >= batchSize()) {
+                        build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
+                        parsedStreamMessages = null;
+                    }
+                } else {
+                    //ignore unfiltered stream message
+                }
+
             }
         } catch (Exception e) {
             logger.error("build stream error, stop building", e);
@@ -117,6 +149,14 @@ public abstract class StreamBuilder implements Runnable {
         this.streamParser = streamParser;
     }
 
+    public final StreamFilter getStreamFilter() {
+        return streamFilter;
+    }
+
+    public final void setStreamFilter(StreamFilter streamFilter) {
+        this.streamFilter = streamFilter;
+    }
+
     protected abstract int batchInterval();
     protected abstract int batchSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/StreamFilter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFilter.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFilter.java
new file mode 100644
index 0000000..e590320
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFilter.java
@@ -0,0 +1,8 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public interface StreamFilter {
+
+    boolean filter(ParsedStreamMessage streamMessage);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
index 304e252..f541f36 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
@@ -34,11 +34,9 @@
 
 package org.apache.kylin.streaming;
 
-import java.util.List;
-
 /**
  */
 public interface StreamParser {
 
-    List<String> parse(StreamMessage streamMessage);
+    ParsedStreamMessage parse(StreamMessage streamMessage);
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
index 14e1b45..f0e335b 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
@@ -34,8 +34,6 @@
 
 package org.apache.kylin.streaming;
 
-import java.util.List;
-
 import com.google.common.collect.Lists;
 
 /**
@@ -46,7 +44,7 @@ public final class StringStreamParser implements StreamParser {
 
     private StringStreamParser(){}
     @Override
-    public List<String> parse(StreamMessage streamMessage) {
-        return Lists.newArrayList(new String(streamMessage.getRawData()).split(","));
+    public ParsedStreamMessage parse(StreamMessage streamMessage) {
+        return new ParsedStreamMessage(Lists.newArrayList(new String(streamMessage.getRawData()).split(",")), streamMessage.getOffset(), streamMessage.getOffset());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 56435bd..cf86b44 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -45,6 +45,7 @@ import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.streaming.MicroStreamBatch;
 import org.apache.kylin.streaming.StreamMessage;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamingManager;
@@ -89,23 +90,23 @@ public class IIStreamBuilder extends StreamBuilder {
     }
 
     @Override
-    protected void build(List<StreamMessage> streamsToBuild) throws IOException {
-        if (streamsToBuild.size() > 0) {
-            long offset = streamsToBuild.get(0).getOffset();
+    protected void build(MicroStreamBatch microStreamBatch) throws IOException {
+        if (microStreamBatch.size() > 0) {
+            long offset = microStreamBatch.getOffset().getFirst();
             if (offset < streamingManager.getOffset(streaming, shardId)) {
                 logger.info("this batch has already been built, skip building");
                 return;
             }
-            logger.info("stream build start, size:" + streamsToBuild.size());
+            logger.info("stream build start, size:" + microStreamBatch.size());
             Stopwatch stopwatch = new Stopwatch();
             stopwatch.start();
-            final Slice slice = sliceBuilder.buildSlice(streamsToBuild, getStreamParser());
+            final Slice slice = sliceBuilder.buildSlice(microStreamBatch);
             logger.info("slice info, shard:" + slice.getShard() + " timestamp:" + slice.getTimestamp() + " record count:" + slice.getRecordCount());
 
             loadToHBase(hTable, slice, new IIKeyValueCodec(slice.getInfo()));
             submitOffset(offset);
             stopwatch.stop();
-            logger.info("stream build finished, size:" + streamsToBuild.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " " + TimeUnit.MILLISECONDS);
+            logger.info("stream build finished, size:" + microStreamBatch.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " " + TimeUnit.MILLISECONDS);
         } else {
             logger.info("nothing to build, skip building");
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
index b6769a9..604bf9c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
@@ -43,6 +43,7 @@ import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.apache.kylin.streaming.MicroStreamBatch;
 import org.apache.kylin.streaming.StreamMessage;
 import org.apache.kylin.streaming.StreamParser;
 import org.slf4j.Logger;
@@ -68,17 +69,10 @@ public final class SliceBuilder {
     }
 
 
-    public Slice buildSlice(List<StreamMessage> streamMessages, final StreamParser streamParser) {
-        List<List<String>> table = Lists.transform(streamMessages, new Function<StreamMessage, List<String>>() {
-            @Nullable
-            @Override
-            public List<String> apply(@Nullable StreamMessage input) {
-                return streamParser.parse(input);
-            }
-        });
-        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(table, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+    public Slice buildSlice(MicroStreamBatch microStreamBatch) {
+        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(microStreamBatch.getStreams(), iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
         TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
-        return build(table, tableRecordInfo, dictionaries);
+        return build(microStreamBatch.getStreams(), tableRecordInfo, dictionaries);
     }
 
     private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {