You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/09/23 05:29:22 UTC

[08/11] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f5c55d7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f5c55d7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f5c55d7b

Branch: refs/heads/KYLIN-1011
Commit: f5c55d7b431486b785ccafb4de904d43809480b9
Parents: 55a85df
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Sep 22 16:48:39 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Sep 22 17:29:32 2015 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |   5 -
 .../kylin/job/BuildCubeWithStreamTest.java      |  35 +++--
 .../apache/kylin/job/BuildIIWithStreamTest.java |  43 +++---
 .../java/org/apache/kylin/job/DeployUtil.java   |  32 ++---
 .../job/ITKafkaBasedIIStreamBuilderTest.java    |  85 -----------
 .../kylin/job/hadoop/invertedindex/IITest.java  |  39 ++---
 .../job/streaming/CubeStreamConsumerTest.java   |  90 ------------
 .../kylin/job/streaming/KafkaDataLoader.java    |  11 +-
 .../streaming/PeriodicalStreamBuilderTest.java  | 144 -------------------
 .../invertedindex/streaming/SliceBuilder.java   |  81 +++++++++++
 .../source/kafka/StringStreamingParser.java     |  65 +++++++++
 11 files changed, 220 insertions(+), 410 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 99557fb..9f17913 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -49,11 +49,6 @@
             <artifactId>kylin-invertedindex</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-streaming</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
         
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index b02b2f2..6bfd560 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -34,21 +34,18 @@
 
 package org.apache.kylin.job;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.job.streaming.BootstrapConfig;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
 import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.StreamingManager;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -56,6 +53,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
 /**
  *  for streaming cubing case "test_streaming_table"
  */
@@ -84,12 +85,14 @@ public class BuildCubeWithStreamTest {
 
         kylinConfig = KylinConfig.getInstanceFromEnv();
 
-        //Use a random toplic for kafka data stream
-        StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
+        final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName);
+
+        //Use a random topic for kafka data stream
+        KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
         streamingConfig.setTopic(UUID.randomUUID().toString());
-        StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
+        KafkaConfigManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
 
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, streamingConfig);
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig);
     }
 
     @AfterClass
@@ -120,13 +123,7 @@ public class BuildCubeWithStreamTest {
     @Test
     public void test() throws Exception {
         for (long start = startTime; start < endTime; start += batchInterval) {
-            BootstrapConfig bootstrapConfig = new BootstrapConfig();
-            bootstrapConfig.setStart(start);
-            bootstrapConfig.setEnd(start + batchInterval);
-            bootstrapConfig.setOneOff(true);
-            bootstrapConfig.setPartitionId(0);
-            bootstrapConfig.setStreaming(streamingName);
-            StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(bootstrapConfig);
+            new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 5ca3b29..89be628 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -39,11 +39,7 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -55,12 +51,16 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.invertedindex.streaming.SliceBuilder;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -69,9 +69,6 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.source.hive.HiveTableReader;
 import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -184,32 +181,30 @@ public class BuildIIWithStreamTest {
                 logger.info("measure:" + tblColRef.getName());
             }
         }
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
         final IISegment segment = createSegment(iiName);
         String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
         ToolRunner.run(new IICreateHTableJob(), args);
 
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName, queue, new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0), 0, segment.getIIDesc().getSliceSize());
+        final IIDesc iiDesc = segment.getIIDesc();
+        final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0, iiDesc.isUseLocalDictionary());
 
         List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
         int count = sorted.size();
+        ArrayList<StreamingMessage> messages = Lists.newArrayList();
         for (String[] row : sorted) {
-            logger.info("another row: " + StringUtils.join(row, ","));
-            queue.put(parse(row));
+            if (messages.size() >= iiDesc.getSliceSize()) {
+                messages.add(parse(row));
+            } else {
+                sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
+                messages = Lists.newArrayList();
+            }
+        }
+        if (!messages.isEmpty()) {
+            sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
         }
 
         reader.close();
         logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
-        queue.put(StreamMessage.EOF);
-        final Future<?> future = executorService.submit(streamBuilder);
-        try {
-            future.get();
-        } catch (Exception e) {
-            logger.error("stream build failed", e);
-            fail("stream build failed");
-        }
-
         logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
     }
 
@@ -225,8 +220,8 @@ public class BuildIIWithStreamTest {
         }
     }
 
-    private StreamMessage parse(String[] row) {
-        return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
+    private StreamingMessage parse(String[] row) {
+        return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object>emptyMap());
     }
 
     private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index d47a664..9ec4c88 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,14 +18,9 @@
 
 package org.apache.kylin.job;
 
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -35,6 +30,7 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
 import org.apache.kylin.job.streaming.KafkaDataLoader;
 import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
@@ -43,15 +39,16 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.hive.HiveClient;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.TimedJsonStreamParser;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.TimedJsonStreamParser;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.*;
+import java.util.List;
 
 public class DeployUtil {
     private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
@@ -146,14 +143,13 @@ public class DeployUtil {
         deployHiveTables();
     }
 
-    public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException {
-        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName());
+    public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException {
+        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
         TableDesc tableDesc = cubeInstance.getFactTableDesc();
-
         //load into kafka
-        KafkaDataLoader.loadIntoKafka(streamingConfig, data);
-        logger.info("Write {} messages into topic {}", data.size(), streamingConfig.getTopic());
+        KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data);
+        logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic());
 
         //csv data for H2 use
         List<TblColRef> tableColumns = Lists.newArrayList();
@@ -163,7 +159,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage();
+            List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
deleted file mode 100644
index 6a615cb..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import java.io.File;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-@Ignore("this test case will break existing metadata store")
-public class ITKafkaBasedIIStreamBuilderTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(ITKafkaBasedIIStreamBuilderTest.class);
-
-    private KylinConfig kylinConfig;
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-    }
-
-    @Test
-    public void test() throws Exception {
-        final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig);
-        bootstrap.start("eagle", 0);
-        Thread.sleep(30 * 60 * 1000);
-        logger.info("time is up, stop streaming");
-        bootstrap.stop();
-        Thread.sleep(5 * 1000);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index dcd460b..913a2f7 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -10,12 +10,18 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Function;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.FIFOIterable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.Slice;
@@ -26,6 +32,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
 import org.apache.kylin.invertedindex.model.IIRow;
 import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.invertedindex.streaming.SliceBuilder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -33,6 +40,8 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.StreamingParser;
+import org.apache.kylin.source.kafka.StringStreamingParser;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -41,18 +50,11 @@ import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionar
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint;
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StringStreamParser;
-import org.apache.kylin.streaming.invertedindex.SliceBuilder;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -78,24 +80,23 @@ public class IITest extends LocalFileMetadataTestCase {
         this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
         this.iiDesc = ii.getDescriptor();
 
-        List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
+        List<MessageAndOffset> messages = Lists.transform(Arrays.asList(inputData), new Function<String, MessageAndOffset>() {
             @Nullable
             @Override
-            public StreamMessage apply(String input) {
-                return new StreamMessage(System.currentTimeMillis(), input.getBytes());
+            public MessageAndOffset apply(String input) {
+                return new MessageAndOffset(new Message(input.getBytes()), System.currentTimeMillis());
             }
         });
 
-        List<List<String>> parsedStreamMessages = Lists.newArrayList();
-        StreamParser parser = StringStreamParser.instance;
-
-        MicroStreamBatch batch = new MicroStreamBatch(0);
-        for (StreamMessage message : streamMessages) {
-            ParsedStreamMessage parsedStreamMessage = parser.parse(message);
-            if ((parsedStreamMessage.isAccepted())) {
-                batch.add(parsedStreamMessage);
+        final StreamingParser parser = StringStreamingParser.instance;
+        final List<StreamingMessage> streamingMessages = Lists.transform(messages, new Function<MessageAndOffset, StreamingMessage>() {
+            @Nullable
+            @Override
+            public StreamingMessage apply(@Nullable MessageAndOffset input) {
+                return parser.parse(input);
             }
-        }
+        });
+        StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis()));
 
         iiRows = Lists.newArrayList();
         final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
deleted file mode 100644
index be4fa26..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-@Ignore
-public class CubeStreamConsumerTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
-
-    private KylinConfig kylinConfig;
-
-    private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
-        // remove all existing segments
-        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-
-    }
-
-    @Test
-    public void test() throws Exception {
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
-        queues.add(queue);
-        StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
-        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
-        loadDataFromLocalFile(queue, 100000);
-        future.get();
-    }
-
-    private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
-        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
-        String line;
-        int count = 0;
-        while ((line = br.readLine()) != null && count++ < maxCount) {
-            final List<String> strings = Arrays.asList(line.split("\t"));
-            queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
-        }
-        queue.put(StreamMessage.EOF);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 95fbc9d..c3caa9b 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -10,22 +10,21 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.streaming.BrokerConfig;
-import org.apache.kylin.streaming.KafkaClusterConfig;
-import org.apache.kylin.streaming.StreamingConfig;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 
 /**
  * Load prepared data into kafka(for test use)
  */
 public class KafkaDataLoader {
 
-    public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
+    public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) {
 
-        KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
+        KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
         String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
             @Nullable
             @Override
@@ -44,7 +43,7 @@ public class KafkaDataLoader {
 
         List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
         for (int i = 0; i < messages.size(); ++i) {
-            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
+            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
             keyedMessages.add(keyedMessage);
         }
         producer.send(keyedMessages);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
deleted file mode 100644
index dc6d312..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.MicroStreamBatchConsumer;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StreamingManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class);
-
-    @Before
-    public void setup() {
-        this.createTestMetadata();
-
-    }
-
-    @After
-    public void clear() {
-        this.cleanupTestMetadata();
-    }
-
-    private List<StreamMessage> prepareTestData(long start, long end, int count) {
-        double step = (double) (end - start) / (count - 1);
-        long ts = start;
-        int offset = 0;
-        ArrayList<StreamMessage> result = Lists.newArrayList();
-        for (int i = 0; i < count - 1; ++i) {
-            result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes()));
-            ts += step;
-        }
-        result.add(new StreamMessage(offset++, String.valueOf(end).getBytes()));
-        assertEquals(count, result.size());
-        assertEquals(start + "", new String(result.get(0).getRawData()));
-        assertEquals(end + "", new String(result.get(count - 1).getRawData()));
-        return result;
-    }
-
-    @Test
-    public void test() throws ExecutionException, InterruptedException {
-
-        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
-        queues.add(new LinkedBlockingQueue<StreamMessage>());
-        queues.add(new LinkedBlockingQueue<StreamMessage>());
-
-        final long interval = 3000L;
-        final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval);
-
-        final List<Integer> partitionIds = Lists.newArrayList();
-        for (int i = 0; i < queues.size(); i++) {
-            partitionIds.add(i);
-        }
-
-        final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() {
-            @Override
-            public void consume(MicroStreamBatch microStreamBatch) throws Exception {
-                logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset());
-            }
-
-            @Override
-            public void stop() {
-                logger.info("consumer stopped");
-            }
-        };
-        final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval);
-
-        streamBuilder.setStreamParser(new StreamParser() {
-            @Override
-            public ParsedStreamMessage parse(StreamMessage streamMessage) {
-                return new ParsedStreamMessage(Collections.<String> emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true);
-            }
-        });
-
-        Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder);
-        long timeout = nextPeriodStart + interval;
-        int messageCount = 0;
-        int inPeriodMessageCount = 0;
-        int expectedOffset = 0;
-        logger.info("prepare to add StreamMessage");
-        while (true) {
-            long ts = System.currentTimeMillis();
-            if (ts >= timeout + interval) {
-                break;
-            }
-            if (ts >= nextPeriodStart && ts < timeout) {
-                inPeriodMessageCount++;
-            }
-            for (BlockingQueue<StreamMessage> queue : queues) {
-                queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes()));
-            }
-            if (expectedOffset == 0 && ts >= timeout) {
-                expectedOffset = messageCount - 1;
-            }
-            messageCount++;
-            Thread.sleep(10);
-        }
-        logger.info("totally put " + messageCount + " StreamMessages");
-        logger.info("totally in period " + inPeriodMessageCount + " StreamMessages");
-
-        for (BlockingQueue<StreamMessage> queue : queues) {
-            queue.put(StreamMessage.EOF);
-        }
-
-        future.get();
-
-        for (BlockingQueue<StreamMessage> queue : queues) {
-            queue.take();
-        }
-
-        final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds);
-        logger.info("offset:" + offsets);
-        for (Long offset : offsets.values()) {
-            assertEquals(expectedOffset, offset.longValue());
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
new file mode 100644
index 0000000..ba337c8
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.invertedindex.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.invertedindex.index.BatchSliceMaker;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ */
+public final class SliceBuilder {
+
+    private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
+
+    private final BatchSliceMaker sliceMaker;
+    private final IIDesc iiDesc;
+    private final boolean useLocalDict;
+
+    public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+        this.iiDesc = desc;
+        this.sliceMaker = new BatchSliceMaker(desc, shard);
+        this.useLocalDict = useLocalDict;
+    }
+
+    public Slice buildSlice(StreamingBatch microStreamBatch) {
+        final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+            @Nullable
+            @Override
+            public List<String> apply(@Nullable StreamingMessage input) {
+                return input.getData();
+            }
+        });
+        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+        TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
+        return build(messages, tableRecordInfo, dictionaries);
+    }
+
+    private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
+        final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+            @Nullable
+            @Override
+            public TableRecord apply(@Nullable List<String> input) {
+                TableRecord result = tableRecordInfo.createTableRecord();
+                for (int i = 0; i < input.size(); i++) {
+                    result.setValueString(i, input.get(i));
+                }
+                return result;
+            }
+        }));
+        slice.setLocalDictionaries(localDictionary);
+        return slice;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
new file mode 100644
index 0000000..307f73a
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.source.kafka;
+
+import com.google.common.collect.Lists;
+import kafka.message.MessageAndOffset;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+/**
+ */
+public final class StringStreamingParser implements StreamingParser {
+
+    public static final StringStreamingParser instance = new StringStreamingParser();
+
+    private StringStreamingParser() {
+    }
+
+    @Override
+    public StreamingMessage parse(MessageAndOffset kafkaMessage) {
+        final ByteBuffer payload = kafkaMessage.message().payload();
+        byte[] bytes = new byte[payload.limit()];
+        payload.get(bytes);
+        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object>emptyMap());
+    }
+
+    @Override
+    public boolean filter(StreamingMessage streamingMessage) {
+        return true;
+    }
+}