You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/09/20 03:43:49 UTC

[02/13] kylin git commit: Revert "KYLIN-1726 Scalable streaming cubing"

Revert "KYLIN-1726 Scalable streaming cubing"

This reverts commit 81c7323b633df88eedac8b319fc57f9b62b01a4a.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/506cd783
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/506cd783
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/506cd783

Branch: refs/heads/master
Commit: 506cd783132023a06f1669ad248b74bf9d96d0e1
Parents: 1f48804
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Sep 19 23:55:54 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Tue Sep 20 11:43:08 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   |  35 +--
 .../kylin/job/streaming/Kafka10DataLoader.java  |  80 -------
 .../apache/kylin/common/KylinConfigBase.java    |   1 -
 .../java/org/apache/kylin/cube/CubeSegment.java |   1 -
 .../java/org/apache/kylin/cube/ISegment.java    |  39 ++++
 .../cube/gridtable/SegmentGTStartAndEnd.java    |   2 +-
 .../cube/model/CubeJoinedFlatTableDesc.java     |   6 -
 .../cube/model/CubeJoinedFlatTableEnrich.java   |   6 -
 .../apache/kylin/gridtable/ScannerWorker.java   |   2 +-
 .../metadata/model/IJoinedFlatTableDesc.java    |   2 -
 .../apache/kylin/metadata/model/ISegment.java   |  36 ---
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  |   3 -
 .../org/apache/kylin/engine/mr/IMRInput.java    |  10 -
 .../java/org/apache/kylin/engine/mr/MRUtil.java |   4 -
 .../test_streaming_table_model_desc.json        |   6 +-
 .../kylin/provision/BuildCubeWithStream.java    | 218 +++++-------------
 .../org/apache/kylin/provision/MockKafka.java   | 191 ----------------
 .../apache/kylin/provision/NetworkUtils.java    |  52 -----
 pom.xml                                         |   2 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |  11 -
 source-kafka/pom.xml                            |  13 +-
 .../kylin/source/kafka/KafkaConfigManager.java  |  46 ++--
 .../apache/kylin/source/kafka/KafkaMRInput.java | 221 -------------------
 .../apache/kylin/source/kafka/KafkaSource.java  |  57 -----
 .../kylin/source/kafka/KafkaStreamingInput.java |  65 +++---
 .../kylin/source/kafka/MergeOffsetStep.java     |  89 --------
 .../kylin/source/kafka/SeekOffsetStep.java      | 119 ----------
 .../kylin/source/kafka/StreamingParser.java     |  49 ++--
 .../source/kafka/StringStreamingParser.java     |  49 ++--
 .../source/kafka/TimedJsonStreamParser.java     |  49 ++--
 .../apache/kylin/source/kafka/TopicMeta.java    |  49 ++--
 .../kylin/source/kafka/UpdateTimeRangeStep.java | 108 ---------
 .../source/kafka/config/KafkaClusterConfig.java |   3 +-
 .../source/kafka/hadoop/KafkaFlatTableJob.java  | 165 --------------
 .../kafka/hadoop/KafkaFlatTableMapper.java      |  51 -----
 .../source/kafka/hadoop/KafkaInputFormat.java   |  98 --------
 .../kafka/hadoop/KafkaInputRecordReader.java    | 166 --------------
 .../source/kafka/hadoop/KafkaInputSplit.java    | 102 ---------
 .../kylin/source/kafka/util/KafkaClient.java    | 115 ----------
 .../source/kafka/util/KafkaOffsetMapping.java   |  97 --------
 .../kylin/source/kafka/util/KafkaRequester.java |  56 +++--
 .../kylin/source/kafka/util/KafkaUtils.java     |   3 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   2 +-
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |   2 +-
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |   2 +-
 45 files changed, 348 insertions(+), 2135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9b282e3..8c64f91 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -143,12 +143,14 @@ public class DeployUtil {
         deployHiveTables();
     }
 
-    public static void prepareTestDataForStreamingCube(long startTime, long endTime, int numberOfRecords, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
+    public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
         CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-        List<String> data = StreamingTableDataGenerator.generate(numberOfRecords, startTime, endTime, cubeInstance.getFactTable());
+        List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
+        List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable());
         TableDesc tableDesc = cubeInstance.getFactTableDesc();
         //load into kafka
         streamDataLoader.loadIntoKafka(data);
+        streamDataLoader.loadIntoKafka(data2);
         logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString());
 
         //csv data for H2 use
@@ -163,7 +165,7 @@ public class DeployUtil {
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }
-        appendFactTableData(sb.toString(), cubeInstance.getFactTable());
+        overrideFactTableData(sb.toString(), cubeInstance.getFactTable());
     }
 
     public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException {
@@ -177,33 +179,6 @@ public class DeployUtil {
         in.close();
     }
 
-    public static void appendFactTableData(String factTableContent, String factTableName) throws IOException {
-        // Write to resource store
-        ResourceStore store = ResourceStore.getStore(config());
-
-        InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8"));
-        String factTablePath = "/data/" + factTableName + ".csv";
-
-        File tmpFile = File.createTempFile(factTableName, "csv");
-        FileOutputStream out = new FileOutputStream(tmpFile);
-
-        try {
-            if (store.exists(factTablePath)) {
-                InputStream oldContent = store.getResource(factTablePath).inputStream;
-                IOUtils.copy(oldContent, out);
-            }
-            IOUtils.copy(in, out);
-            IOUtils.closeQuietly(in);
-
-            store.deleteResource(factTablePath);
-            in = new FileInputStream(tmpFile);
-            store.putResource(factTablePath, in, System.currentTimeMillis());
-        } finally {
-            IOUtils.closeQuietly(out);
-            IOUtils.closeQuietly(in);
-        }
-
-    }
 
     private static void deployHiveTables() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
deleted file mode 100644
index a5132af..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-
-import org.apache.kylin.source.kafka.util.KafkaClient;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class Kafka10DataLoader extends StreamDataLoader {
-    private static final Logger logger = LoggerFactory.getLogger(Kafka10DataLoader.class);
-    List<KafkaClusterConfig> kafkaClusterConfigs;
-
-    public Kafka10DataLoader(KafkaConfig kafkaConfig) {
-        super(kafkaConfig);
-        this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
-    }
-
-    public void loadIntoKafka(List<String> messages) {
-
-        KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
-        String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
-            @Nullable
-            @Override
-            public String apply(BrokerConfig brokerConfig) {
-                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
-            }
-        }), ",");
-
-        Properties props = new Properties();
-        props.put("acks", "1");
-        props.put("retry.backoff.ms", "1000");
-        KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props);
-
-        int boundary = messages.size() / 10;
-        for (int i = 0; i < messages.size(); ++i) {
-            ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
-            producer.send(keyedMessage);
-            if (i % boundary == 0) {
-                logger.info("sending " + i + " messages to " + this.toString());
-            }
-        }
-        logger.info("sent " + messages.size() + " messages to " + this.toString());
-        producer.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index fafb1fc..79ee084 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -717,7 +717,6 @@ abstract public class KylinConfigBase implements Serializable {
         Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.source.engine."));
         // ref constants in ISourceAware
         r.put(0, "org.apache.kylin.source.hive.HiveSource");
-        r.put(1, "org.apache.kylin.source.kafka.KafkaSource");
         return r;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index afb0d28..79397c3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -37,7 +37,6 @@ import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java b/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java
new file mode 100644
index 0000000..2e1f214
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+public interface ISegment {
+
+    public String getName();
+
+    public long getDateRangeStart();
+
+    public long getDateRangeEnd();
+
+    public long getSourceOffsetStart();
+
+    public long getSourceOffsetEnd();
+    
+    public DataModelDesc getModel();
+
+    public SegmentStatusEnum getStatus();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
index 889a0b2..e31111d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.cube.ISegment;
 import org.apache.kylin.dimension.AbstractDateDimEnc;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.metadata.datatype.DataType;

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 6ca89c8..6aeb617 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -26,7 +26,6 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -163,9 +162,4 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         return cubeDesc.getDistributedByColumn();
     }
 
-    @Override
-    public ISegment getSegment() {
-        return cubeSegment;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 8af2297..5212859 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -25,7 +25,6 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -138,9 +137,4 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
         return flatDesc.getDistributedBy();
     }
 
-    @Override
-    public ISegment getSegment() {
-        return flatDesc.getSegment();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
index 4213cf3..bb7503a 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Iterator;
 
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.cube.ISegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index ffa2680..f3a4107 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -37,6 +37,4 @@ public interface IJoinedFlatTableDesc {
     long getSourceOffsetEnd();
     
     TblColRef getDistributedBy();
-
-    ISegment getSegment();
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
deleted file mode 100644
index f69ae3f..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *  
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.model;
-
-public interface ISegment {
-
-    public String getName();
-
-    public long getDateRangeStart();
-
-    public long getDateRangeEnd();
-
-    public long getSourceOffsetStart();
-
-    public long getSourceOffsetEnd();
-    
-    public DataModelDesc getModel();
-
-    public SegmentStatusEnum getStatus();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index badf628..129d525 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -34,12 +34,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
     private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
 
     private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
-    private final IMRInput.IMRBatchMergeInputSide inputSide;
 
     public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
         this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
-        this.inputSide = MRUtil.getBatchMergeInputSide(seg);
     }
 
     public CubingJob build() {
@@ -57,7 +55,6 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         }
 
         // Phase 1: Merge Dictionary
-        inputSide.addStepPhase1_MergeDictionary(result);
         result.addTask(createMergeDictionaryStep(mergingSegmentIds));
         result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
         outputSide.addStepPhase1_MergeDictionary(result);

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 62cede9..582052f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
 
 /**
@@ -35,9 +34,6 @@ public interface IMRInput {
     /** Return an InputFormat that reads from specified table. */
     public IMRTableInputFormat getTableInputFormat(TableDesc table);
 
-    /** Return a helper to participate in batch cubing merge job flow. */
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
-
     /**
      * Utility that configures mapper to read from a table.
      */
@@ -71,10 +67,4 @@ public interface IMRInput {
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface IMRBatchMergeInputSide {
-
-        /** Add step that executes before merge dictionary and before merge cube. */
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 67eef5e..2c3b77f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -71,10 +71,6 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
     }
 
-    public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
-        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
-    }
-
     // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
     // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
     public static int runMRJob(Tool tool, String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
index e6977e1..cfb889a 100644
--- a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
@@ -4,7 +4,7 @@
   "name": "test_streaming_table_model_desc",
   "dimensions": [
     {
-      "table": "DEFAULT.STREAMING_TABLE",
+      "table": "default.streaming_table",
       "columns": [
         "minute_start",
         "hour_start",
@@ -20,10 +20,10 @@
     "item_count"
   ],
   "last_modified": 0,
-  "fact_table": "DEFAULT.STREAMING_TABLE",
+  "fact_table": "default.streaming_table",
   "filter_condition": null,
   "partition_desc": {
-    "partition_date_column": "DEFAULT.STREAMING_TABLE.minute_start",
+    "partition_date_column": "default.streaming_table.minute_start",
     "partition_date_start": 0,
     "partition_type": "APPEND"
   }

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 7f79acc..9490560 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,36 +20,24 @@ package org.apache.kylin.provision;
 
 import java.io.File;
 import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.TimeZone;
 import java.util.UUID;
 
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.job.streaming.Kafka10DataLoader;
+import org.apache.kylin.job.streaming.KafkaDataLoader;
+import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.junit.Assert;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,123 +46,31 @@ import org.slf4j.LoggerFactory;
  */
 public class BuildCubeWithStream {
 
-    private static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class);
-
-    private CubeManager cubeManager;
-    private DefaultScheduler scheduler;
-    protected ExecutableManager jobService;
+    private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class);
     private static final String cubeName = "test_streaming_table_cube";
+    private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00");
+    private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00");
+    private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
 
-    private KafkaConfig kafkaConfig;
-    private MockKafka kafkaServer;
-
-    public void before() throws Exception {
-        deployEnv();
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.createInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-        cubeManager = CubeManager.getInstance(kylinConfig);
-
-        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
-        final String factTable = cubeInstance.getFactTable();
-
-        final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
-        final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable);
-        kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName());
-
-        String topicName = UUID.randomUUID().toString();
-        String localIp = NetworkUtils.getLocalIp();
-        BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0);
-        brokerConfig.setHost(localIp);
-        kafkaConfig.setTopic(topicName);
-        KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig);
-
-        startEmbeddedKafka(topicName, brokerConfig);
-    }
-
-    private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
-        //Start mock Kakfa
-        String zkConnectionStr = "sandbox:2181";
-        ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
-        // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
-        kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
-        kafkaServer.start();
-
-        kafkaServer.createTopic(topicName, 3, 1);
-        kafkaServer.waitTopicUntilReady(topicName);
-
-        MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName);
-        Assert.assertEquals(topicName, topicMetadata.topic());
-    }
-
-    private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
-        Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader);
-        logger.info("Test data inserted into Kafka");
-    }
+    private KylinConfig kylinConfig;
 
-    private void clearSegment(String cubeName) throws Exception {
-        CubeInstance cube = cubeManager.getCube(cubeName);
-        // remove all existing segments
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
-        cubeManager.updateCube(cubeBuilder);
-    }
-
-    public void build() throws Exception {
-        clearSegment(cubeName);
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-        long date1 = 0;
-        long date2 = f.parse("2013-01-01").getTime();
-
-        int numberOfRecrods1 = 10000;
-        generateStreamData(date1, date2, numberOfRecrods1);
-        buildSegment(cubeName, 0, Long.MAX_VALUE);
-
-        long date3 = f.parse("2013-04-01").getTime();
-        int numberOfRecrods2 = 5000;
-        generateStreamData(date2, date3, numberOfRecrods2);
-        buildSegment(cubeName, 0, Long.MAX_VALUE);
-
-        //merge
-        mergeSegment(cubeName, 0, 15000);
-
-    }
-
-    private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
-        CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true);
-        DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getId();
-    }
+    public static void main(String[] args) throws Exception {
 
-    private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception {
-        CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
-        DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getId();
-    }
+        try {
+            beforeClass();
 
-    private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
-        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
-        DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getId();
-    }
+            BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+            buildCubeWithStream.before();
+            buildCubeWithStream.build();
+            logger.info("Build is done");
+            buildCubeWithStream.cleanup();
+            logger.info("Going to exit");
+            System.exit(0);
+        } catch (Exception e) {
+            logger.error("error", e);
+            System.exit(1);
+        }
 
-    protected void deployEnv() throws IOException {
-        DeployUtil.overrideJobJarLocations();
-        //DeployUtil.initCliWorkDir();
-        //DeployUtil.deployMetadata();
     }
 
     public static void beforeClass() throws Exception {
@@ -187,54 +83,44 @@ public class BuildCubeWithStream {
         HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
     }
 
-    public static void afterClass() throws Exception {
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    protected void deployEnv() throws IOException {
+        DeployUtil.overrideJobJarLocations();
     }
 
-    public void after() {
-        kafkaServer.stop();
-    }
+    public void before() throws Exception {
+        deployEnv();
 
-    protected void waitForJob(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final String factTable = cubeInstance.getFactTable();
+        final StreamingConfig config = StreamingManager.getInstance(kylinConfig).getStreamingConfig(factTable);
 
-    public static void main(String[] args) throws Exception {
-        try {
-            beforeClass();
+        //Use a random topic for kafka data stream
+        KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(config.getName());
+        streamingConfig.setTopic(UUID.randomUUID().toString());
+        KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
 
-            BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
-            buildCubeWithStream.before();
-            buildCubeWithStream.build();
-            logger.info("Build is done");
-            buildCubeWithStream.after();
-            afterClass();
-            logger.info("Going to exit");
-            System.exit(0);
-        } catch (Exception e) {
-            logger.error("error", e);
-            System.exit(1);
-        }
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig));
+    }
 
+    public void cleanup() throws Exception {
+        cleanupOldStorage();
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
     }
 
     protected int cleanupOldStorage() throws Exception {
         String[] args = { "--delete", "true" };
 
-        //        KapStorageCleanupCLI cli = new KapStorageCleanupCLI();
-        //        cli.execute(args);
+        StorageCleanupJob cli = new StorageCleanupJob();
+        cli.execute(args);
         return 0;
     }
 
+    public void build() throws Exception {
+        logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval));
+        for (long start = startTime; start < endTime; start += batchInterval) {
+            logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval));
+            new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, start + batchInterval).build().run();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
deleted file mode 100644
index 3f47923..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.provision;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.kafka.common.requests.MetadataResponse;
-
-import kafka.admin.AdminUtils;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import kafka.utils.ZkUtils;
-
-public class MockKafka {
-    private static Properties createProperties(ZkConnection zkServerConnection, String logDir, String port, String brokerId) {
-        Properties properties = new Properties();
-        properties.put("port", port);
-        properties.put("broker.id", brokerId);
-        properties.put("log.dirs", logDir);
-        properties.put("host.name", "localhost");
-        properties.put("offsets.topic.replication.factor", "1");
-        properties.put("delete.topic.enable", "true");
-        properties.put("zookeeper.connect", zkServerConnection.getServers());
-        String ip = NetworkUtils.getLocalIp();
-        properties.put("listeners", "PLAINTEXT://" + ip + ":" + port);
-        properties.put("advertised.listeners", "PLAINTEXT://" + ip + ":" + port);
-        return properties;
-    }
-
-    private KafkaServerStartable kafkaServer;
-
-    private ZkConnection zkConnection;
-
-    public MockKafka(ZkConnection zkServerConnection) {
-        this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), "9092", "1");
-        start();
-    }
-
-    private MockKafka(Properties properties) {
-        KafkaConfig kafkaConfig = new KafkaConfig(properties);
-        kafkaServer = new KafkaServerStartable(kafkaConfig);
-    }
-
-    public MockKafka(ZkConnection zkServerConnection, int port, int brokerId) {
-        this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), String.valueOf(port), String.valueOf(brokerId));
-        start();
-    }
-
-    private MockKafka(ZkConnection zkServerConnection, String logDir, String port, String brokerId) {
-        this(createProperties(zkServerConnection, logDir, port, brokerId));
-        this.zkConnection = zkServerConnection;
-        System.out.println(String.format("Kafka %s:%s dir:%s", kafkaServer.serverConfig().brokerId(), kafkaServer.serverConfig().port(), kafkaServer.serverConfig().logDirs()));
-    }
-
-    public void createTopic(String topic, int partition, int replication) {
-        ZkClient zkClient = new ZkClient(zkConnection);
-        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
-        zkClient.setZkSerializer(new ZKStringSerializer());
-        AdminUtils.createTopic(zkUtils, topic, partition, replication, new Properties(), null);
-        zkClient.close();
-    }
-
-    public void createTopic(String topic) {
-        this.createTopic(topic, 1, 1);
-    }
-
-    public MetadataResponse.TopicMetadata fetchTopicMeta(String topic) {
-        ZkClient zkClient = new ZkClient(zkConnection);
-        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
-        zkClient.setZkSerializer(new ZKStringSerializer());
-        MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
-        zkClient.close();
-        return topicMetadata;
-    }
-
-    /**
-     * Delete may not work
-     *
-     * @param topic
-     */
-    public void deleteTopic(String topic) {
-        ZkClient zkClient = new ZkClient(zkConnection);
-        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
-        zkClient.setZkSerializer(new ZKStringSerializer());
-        AdminUtils.deleteTopic(zkUtils, topic);
-        zkClient.close();
-    }
-
-    public String getConnectionString() {
-        return String.format("%s:%d", kafkaServer.serverConfig().hostName(), kafkaServer.serverConfig().port());
-    }
-
-    public void start() {
-        kafkaServer.startup();
-        System.out.println("embedded kafka is up");
-    }
-
-    public void stop() {
-        kafkaServer.shutdown();
-        System.out.println("embedded kafka down");
-    }
-
-    public MetadataResponse.TopicMetadata waitTopicUntilReady(String topic) {
-        boolean isReady = false;
-        MetadataResponse.TopicMetadata topicMeta = null;
-        while (!isReady) {
-            Random random = new Random();
-            topicMeta = this.fetchTopicMeta(topic);
-            List<MetadataResponse.PartitionMetadata> partitionsMetadata = topicMeta.partitionMetadata();
-            Iterator<MetadataResponse.PartitionMetadata> iterator = partitionsMetadata.iterator();
-            boolean hasGotLeader = true;
-            boolean hasGotReplica = true;
-            while (iterator.hasNext()) {
-                MetadataResponse.PartitionMetadata partitionMeta = iterator.next();
-                hasGotLeader &= (!partitionMeta.leader().isEmpty());
-                if (partitionMeta.leader().isEmpty()) {
-                    System.out.println("Partition leader is not ready, wait 1s.");
-                    break;
-                }
-                hasGotReplica &= (!partitionMeta.replicas().isEmpty());
-                if (partitionMeta.replicas().isEmpty()) {
-                    System.out.println("Partition replica is not ready, wait 1s.");
-                    break;
-                }
-            }
-            isReady = hasGotLeader & hasGotReplica;
-            if (!isReady) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                }
-            }
-        }
-        return topicMeta;
-    }
-
-    public String getZookeeperConnection() {
-        return this.zkConnection.getServers();
-    }
-}
-
-class ZKStringSerializer implements ZkSerializer {
-
-    @Override
-    public byte[] serialize(Object data) throws ZkMarshallingError {
-        byte[] bytes = null;
-        try {
-            bytes = data.toString().getBytes("UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            throw new ZkMarshallingError(e);
-        }
-        return bytes;
-    }
-
-    @Override
-    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
-        if (bytes == null)
-            return null;
-        else
-            try {
-                return new String(bytes, "UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new ZkMarshallingError(e);
-            }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java b/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java
deleted file mode 100644
index 98f6d04..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.provision;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
-
-public class NetworkUtils {
-
-    public static String getLocalIp() {
-        try {
-            Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
-            while (interfaces.hasMoreElements()) {
-                NetworkInterface iface = interfaces.nextElement();
-                if (iface.isLoopback() || !iface.isUp() || iface.isVirtual() || iface.isPointToPoint())
-                    continue;
-                if (iface.getName().startsWith("vboxnet"))
-                    continue;
-
-                Enumeration<InetAddress> addresses = iface.getInetAddresses();
-                while (addresses.hasMoreElements()) {
-                    InetAddress addr = addresses.nextElement();
-                    final String ip = addr.getHostAddress();
-                    if (Inet4Address.class == addr.getClass())
-                        return ip;
-                }
-            }
-        } catch (SocketException e) {
-            throw new RuntimeException(e);
-        }
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 30d3324..1abc4eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
 
         <!-- HBase versions -->
         <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
-        <kafka.version>0.10.0.0</kafka.version>
+        <kafka.version>0.8.1</kafka.version>
 
         <!-- Hadoop deps, keep compatible with hadoop2.version -->
         <zookeeper.version>3.4.6</zookeeper.version>

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 09ac522..520d7cc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -50,7 +50,6 @@ import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.LookupDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;
@@ -70,16 +69,6 @@ public class HiveMRInput implements IMRInput {
         return new HiveTableInputFormat(table.getIdentity());
     }
 
-    @Override
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new IMRBatchMergeInputSide() {
-            @Override
-            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-                // doing nothing
-            }
-        };
-    }
-
     public static class HiveTableInputFormat implements IMRTableInputFormat {
         final String dbName;
         final String tableName;

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 212f4c6..90c2211 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -32,11 +32,10 @@
 
     </parent>
 
+    <properties>
+    </properties>
+
     <dependencies>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-engine-mr</artifactId>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.kylin</groupId>
@@ -61,10 +60,16 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
 
+
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index cfdf316..d594873 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -1,19 +1,35 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
  */
 
 package org.apache.kylin.source.kafka;

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
deleted file mode 100644
index cfce137..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-public class KafkaMRInput implements IMRInput {
-
-    CubeSegment cubeSegment;
-
-    @Override
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        this.cubeSegment = (CubeSegment)flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment);
-    }
-
-    @Override
-    public IMRTableInputFormat getTableInputFormat(TableDesc table) {
-        KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
-        KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity());
-        List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() {
-            @Nullable
-            @Override
-            public TblColRef apply(ColumnDesc input) {
-                return input.getRef();
-            }
-        });
-
-        return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null);
-    }
-
-    @Override
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new KafkaMRBatchMergeInputSide((CubeSegment) seg);
-    }
-
-    public static class KafkaTableInputFormat implements IMRTableInputFormat {
-        private final CubeSegment cubeSegment;
-        private List<TblColRef> columns;
-        private StreamingParser streamingParser;
-        private KafkaConfig kafkaConfig;
-        private final JobEngineConfig conf;
-
-        public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
-            this.cubeSegment = cubeSegment;
-            this.columns = columns;
-            this.kafkaConfig = kafkaConfig;
-            this.conf = conf;
-        }
-
-        @Override
-        public void configureJob(Job job) {
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapOutputValueClass(Text.class);
-            String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
-            IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
-            String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
-            try {
-                FileInputFormat.addInputPath(job, new Path(inputPath));
-            } catch (IOException e) {
-                throw new IllegalStateException(e);
-            }
-        }
-
-        @Override
-        public String[] parseMapperInput(Object mapperInput) {
-            if (streamingParser == null) {
-                try {
-                    streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
-                } catch (ReflectiveOperationException e) {
-                    throw new IllegalArgumentException();
-                }
-            }
-            Text text = (Text) mapperInput;
-            ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()).slice();
-            StreamingMessage streamingMessage = streamingParser.parse(buffer);
-            return streamingMessage.getData().toArray(new String[streamingMessage.getData().size()]);
-        }
-
-    }
-
-    public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
-
-        final JobEngineConfig conf;
-        final CubeSegment seg;
-        private String outputPath;
-
-        public BatchCubingInputSide(CubeSegment seg) {
-            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            this.seg = seg;
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createUpdateSegmentOffsetStep(jobFlow.getId()));
-            jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
-        }
-
-        public SeekOffsetStep createUpdateSegmentOffsetStep(String jobId) {
-            final SeekOffsetStep result = new SeekOffsetStep();
-            result.setName("Seek and update offset step");
-
-            CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
-            CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
-            CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
-
-            return result;
-        }
-
-        private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
-            MapReduceExecutable result = new MapReduceExecutable();
-
-            IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
-            outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
-            result.setName("Save data from Kafka");
-            result.setMapReduceJobClass(KafkaFlatTableJob.class);
-            JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
-            StringBuilder cmd = new StringBuilder();
-            jobBuilderSupport.appendMapReduceParameters(cmd);
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
-
-            result.setMapReduceParams(cmd.toString());
-            return result;
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            final UpdateTimeRangeStep result = new UpdateTimeRangeStep();
-            result.setName("Update Segment Time Range");
-            CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
-            CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
-            CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
-            JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "SYSTEM");
-            result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, jobBuilderSupport.getFactDistinctColumnsPath(jobFlow.getId()));
-            jobFlow.addTask(result);
-
-        }
-
-        @Override
-        public IMRTableInputFormat getFlatTableInputFormat() {
-            KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
-            KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getRealization().getFactTable());
-            List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns();
-
-            return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf);
-
-        }
-
-    }
-
-    class KafkaMRBatchMergeInputSide implements IMRBatchMergeInputSide {
-
-        private CubeSegment cubeSegment;
-
-        KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) {
-            this.cubeSegment = cubeSegment;
-        }
-
-        @Override
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-
-            final MergeOffsetStep result = new MergeOffsetStep();
-            result.setName("Merge offset step");
-
-            CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams());
-            CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
-            CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
-            jobFlow.addTask(result);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
deleted file mode 100644
index d039583..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import com.google.common.collect.Lists;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import java.util.List;
-
-//used by reflection
-public class KafkaSource implements ISource {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
-        if (engineInterface == IMRInput.class) {
-            return (I) new KafkaMRInput();
-        } else {
-            throw new RuntimeException("Cannot adapt to " + engineInterface);
-        }
-    }
-
-    @Override
-    public ReadableTable createReadableTable(TableDesc tableDesc) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<String> getMRDependentResources(TableDesc table) {
-        List<String> dependentResources = Lists.newArrayList();
-        dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
-        dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
-        return dependentResources;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index de42689..c3bdb75 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -1,20 +1,36 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
 package org.apache.kylin.source.kafka;
 
 import java.util.List;
@@ -24,9 +40,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import com.google.common.base.Function;
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StreamingBatch;
@@ -53,8 +66,6 @@ import kafka.javaapi.FetchResponse;
 import kafka.javaapi.PartitionMetadata;
 import kafka.message.MessageAndOffset;
 
-import javax.annotation.Nullable;
-
 @SuppressWarnings("unused")
 public class KafkaStreamingInput implements IStreamingInput {
 
@@ -140,16 +151,8 @@ public class KafkaStreamingInput implements IStreamingInput {
                 if (partitionMetadata.errorCode() != 0) {
                     logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
                 }
-                replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() {
-                    @Nullable
-                    @Override
-                    public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) {
-                        return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT);
-                    }
-                });
-                BrokerEndPoint leaderEndpoint = partitionMetadata.leader();
-
-                return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT);
+                replicaBrokers = partitionMetadata.replicas();
+                return partitionMetadata.leader();
             } else {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
deleted file mode 100644
index a21b980..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
-
-/**
- */
-public class MergeOffsetStep extends AbstractExecutable {
-
-    private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
-    public MergeOffsetStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
-        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
-        List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
-        Map<Integer, Long> mergedStartOffsets = Maps.newHashMap();
-        Map<Integer, Long> mergedEndOffsets = Maps.newHashMap();
-
-        long dateRangeStart = Long.MAX_VALUE, dateRangeEnd = 0;
-        for (CubeSegment seg: mergingSegs) {
-            Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(seg);
-            Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(seg);
-
-            for (Integer partition : startOffsets.keySet()) {
-                long currentStart = mergedStartOffsets.get(partition) != null ? Long.valueOf(mergedStartOffsets.get(partition)) : Long.MAX_VALUE;
-                long currentEnd = mergedEndOffsets.get(partition) != null ? Long.valueOf(mergedEndOffsets.get(partition)) : 0;
-                mergedStartOffsets.put(partition, Math.min(currentStart, startOffsets.get(partition)));
-                mergedEndOffsets.put(partition, Math.max(currentEnd, endOffsets.get(partition)));
-            }
-            dateRangeStart = Math.min(dateRangeStart, seg.getDateRangeStart());
-            dateRangeEnd = Math.max(dateRangeEnd, seg.getDateRangeEnd());
-        }
-
-        KafkaOffsetMapping.saveOffsetStart(segment, mergedStartOffsets);
-        KafkaOffsetMapping.saveOffsetEnd(segment, mergedEndOffsets);
-        segment.setDateRangeStart(dateRangeStart);
-        segment.setDateRangeEnd(dateRangeEnd);
-
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToUpdateSegs(segment);
-        try {
-            cubeManager.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update cube segment offset", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
deleted file mode 100644
index 5dca93f..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class SeekOffsetStep extends AbstractExecutable {
-
-    private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class);
-
-    public SeekOffsetStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
-        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
-        Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(segment);
-        Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(segment);
-
-        if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
-        }
-
-        final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
-        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
-        final String topic = kafakaConfig.getTopic();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
-            final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-
-            if (startOffsets.isEmpty()) {
-                // user didn't specify start offset, use the biggest offset in existing segments as start
-                for (CubeSegment seg : cube.getSegments()) {
-                    Map<Integer, Long> segEndOffset = KafkaOffsetMapping.parseOffsetEnd(seg);
-                    for (PartitionInfo partition : partitionInfos) {
-                        int partitionId = partition.partition();
-                        if (segEndOffset.containsKey(partitionId)) {
-                            startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
-                        }
-                    }
-                }
-
-                if (partitionInfos.size() > startOffsets.size()) {
-                    // has new partition added
-                    for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
-                        long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
-                        startOffsets.put(partitionInfos.get(x).partition(), earliest);
-                    }
-                }
-
-                logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
-            }
-
-            if (endOffsets.isEmpty()) {
-                // user didn't specify end offset, use latest offset in kafka
-                for (PartitionInfo partitionInfo : partitionInfos) {
-                    long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
-                    endOffsets.put(partitionInfo.partition(), latest);
-                }
-
-                logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
-            }
-        }
-
-        KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
-        KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
-
-        segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd()));
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToUpdateSegs(segment);
-        try {
-            cubeManager.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update cube segment offset", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 6b7d658..cb6a72b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -1,20 +1,37 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
 package org.apache.kylin.source.kafka;
 
 import java.lang.reflect.Constructor;

http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index 666297f..8888d67 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -1,20 +1,37 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
 package org.apache.kylin.source.kafka;
 
 import java.nio.ByteBuffer;