You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:54 UTC

[17/50] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2b5495ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2b5495ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2b5495ce

Branch: refs/heads/streaming-localdict
Commit: 2b5495ce1debe21be361e942428cfff0bd1dff36
Parents: c3ff4f4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 27 10:05:20 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 27 10:05:20 2015 +0800

----------------------------------------------------------------------
 .../invertedindex/index/BatchSliceBuilder.java  |   2 +-
 .../kylin/job/streaming/StreamingBootstrap.java | 117 +++++++++++++++++
 .../kylin/job/streaming/StreamingCLI.java       |  71 ++++++++++
 .../apache/kylin/job/IIStreamBuilderTest.java   |   2 +-
 pom.xml                                         |   1 +
 .../apache/kylin/streaming/KafkaRequester.java  | 128 +++++++++++--------
 .../kylin/streaming/StreamingBootstrap.java     | 109 ----------------
 .../apache/kylin/streaming/StreamingCLI.java    |  71 ----------
 8 files changed, 265 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
index 6ba328c..037dd6c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
@@ -87,7 +87,7 @@ public class BatchSliceBuilder {
 
     private long increaseSliceTimestamp(long timestamp) {
         if (timestamp <= sliceTimestamp) {
-            return ++timestamp; // ensure slice timestamp increases
+            return sliceTimestamp+1; // ensure slice timestamp increases
         } else {
             return timestamp;
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
new file mode 100644
index 0000000..ddaae29
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -0,0 +1,117 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job.streaming;
+
+import com.google.common.base.Preconditions;
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.PartitionMetadata;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.streaming.*;
+import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Created by qianzhou on 3/26/15.
+ */
+public class StreamingBootstrap {
+
+    private KylinConfig kylinConfig;
+    private StreamManager streamManager;
+    private IIManager iiManager;
+
+    public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
+        return new StreamingBootstrap(kylinConfig);
+    }
+
+    private StreamingBootstrap(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+        this.streamManager = StreamManager.getInstance(kylinConfig);
+        this.iiManager = IIManager.getInstance(kylinConfig);
+    }
+
+    private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
+        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+            return partitionMetadata.leader();
+        } else {
+            return null;
+        }
+    }
+
+    public void startStreaming(String streamingConf, int partitionId) throws Exception {
+        final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streamingConf);
+        Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streamingConf);
+        final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
+        Preconditions.checkNotNull(ii);
+        Preconditions.checkArgument(ii.getSegments().size() > 0);
+        final IISegment iiSegment = ii.getSegments().get(0);
+
+        final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+        Preconditions.checkState(leadBroker != null, "cannot find lead broker");
+        final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+        long streamOffset = ii.getStreamOffsets().get(partitionId);
+        if (streamOffset < earliestOffset) {
+            streamOffset = earliestOffset;
+        }
+
+
+        KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), 0, streamOffset, kafkaConfig.getBrokers(), kafkaConfig) {
+            @Override
+            protected void consume(long offset, ByteBuffer payload) throws Exception {
+                byte[] bytes = new byte[payload.limit()];
+                payload.get(bytes);
+                getStreamQueue().put(new Stream(offset, bytes));
+            }
+        };
+        final IIDesc desc = ii.getDescriptor();
+
+        Executors.newSingleThreadExecutor().submit(consumer);
+        final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(), iiSegment.getStorageLocationIdentifier(), desc, partitionId);
+        task.setStreamParser(JsonStreamParser.instance);
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(task);
+        future.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
new file mode 100644
index 0000000..8813cb3
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job.streaming;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by qianzhou on 3/26/15.
+ */
+public class StreamingCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
+
+    public static void main(String[] args) {
+        try {
+            if (args.length < 2) {
+                printArgsError(args);
+                return;
+            }
+            if (args[0].equals("start")) {
+                String kafkaConfName = args[1];
+                StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).startStreaming(kafkaConfName, 0);
+            } else if (args.equals("stop")) {
+
+            } else {
+                printArgsError(args);
+            }
+        } catch (Exception e) {
+        }
+    }
+
+    private static void printArgsError(String[] args) {
+        logger.warn("invalid args:" + StringUtils.join(args, " "));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
index 35a0fe9..d42da33 100644
--- a/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
@@ -38,7 +38,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamingBootstrap;
+import org.apache.kylin.job.streaming.StreamingBootstrap;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 38d6220..064ea11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -616,6 +616,7 @@
                                 <exclude>**/BuildCubeWithEngineTest.java</exclude>
                                 <exclude>**/BuildIIWithEngineTest.java</exclude>
                                 <exclude>**/BuildIIWithStreamTest.java</exclude>
+                                <exclude>**/IIStreamBuilderTest.java</exclude>
                                 <exclude>**/SampleCubeSetupTest.java</exclude>
                                 <exclude>**/KylinQueryTest.java</exclude>
                                 <exclude>**/Kafka*Test.java</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
index 699c0ed..ce87047 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaRequester.java
@@ -59,53 +59,65 @@ public final class KafkaRequester {
     private static final Logger logger = LoggerFactory.getLogger(KafkaRequester.class);
 
     public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
-        SimpleConsumer consumer;
+        SimpleConsumer consumer = null;
         for (Broker broker : kafkaConfig.getBrokers()) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp = consumer.send(req);
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                break;
-            }
-            List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
-                @Nullable
-                @Override
-                public Integer apply(PartitionMetadata partitionMetadata) {
-                    return partitionMetadata.partitionId();
+            try {
+                consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+                List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
+                TopicMetadataRequest req = new TopicMetadataRequest(topics);
+                TopicMetadataResponse resp = consumer.send(req);
+                final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+                if (topicMetadatas.size() != 1) {
+                    break;
+                }
+                final TopicMetadata topicMetadata = topicMetadatas.get(0);
+                if (topicMetadata.errorCode() != 0) {
+                    break;
                 }
-            });
-            return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+                List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
+                    @Nullable
+                    @Override
+                    public Integer apply(PartitionMetadata partitionMetadata) {
+                        return partitionMetadata.partitionId();
+                    }
+                });
+                return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+            } finally {
+                if (consumer != null) {
+                    consumer.close();
+                }
+            }
         }
         logger.debug("cannot find topic:" + kafkaConfig.getTopic());
         return null;
     }
 
     public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
-        SimpleConsumer consumer;
+        SimpleConsumer consumer = null;
         for (Broker broker : brokers) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(topic);
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp = consumer.send(req);
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
-                break;
-            }
-            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
-                if (partitionMetadata.partitionId() == partitionId) {
-                    return partitionMetadata;
+            try {
+                consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+                List<String> topics = Collections.singletonList(topic);
+                TopicMetadataRequest req = new TopicMetadataRequest(topics);
+                TopicMetadataResponse resp = consumer.send(req);
+                final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+                if (topicMetadatas.size() != 1) {
+                    logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
+                    break;
+                }
+                final TopicMetadata topicMetadata = topicMetadatas.get(0);
+                if (topicMetadata.errorCode() != 0) {
+                    logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
+                    break;
+                }
+                for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
+                    if (partitionMetadata.partitionId() == partitionId) {
+                        return partitionMetadata;
+                    }
+                }
+            } finally {
+                if (consumer != null) {
+                    consumer.close();
                 }
             }
         }
@@ -116,30 +128,38 @@ public final class KafkaRequester {
     public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
         final String clientName = "client_" + topic + "_" + partitionId;
         SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
-        kafka.api.FetchRequest req = new FetchRequestBuilder()
-                .clientId(clientName)
-                .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
-                .build();
-        return consumer.fetch(req);
+        try {
+            kafka.api.FetchRequest req = new FetchRequestBuilder()
+                    .clientId(clientName)
+                    .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+                    .build();
+            return consumer.fetch(req);
+        } finally {
+            consumer.close();
+        }
     }
 
     public static long getLastOffset(String topic, int partitionId,
                                      long whichTime, Broker broker, KafkaConfig kafkaConfig) {
         String clientName = "client_" + topic + "_" + partitionId;
         SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
-                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
-        OffsetResponse response = consumer.getOffsetsBefore(request);
+        try {
+            TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+            Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+            requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+            kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+                    requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+            OffsetResponse response = consumer.getOffsetsBefore(request);
 
-        if (response.hasError()) {
-            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
-            return 0;
+            if (response.hasError()) {
+                System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+                return 0;
+            }
+            long[] offsets = response.offsets(topic, partitionId);
+            return offsets[0];
+        } finally {
+            consumer.close();
         }
-        long[] offsets = response.offsets(topic, partitionId);
-        return offsets[0];
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
deleted file mode 100644
index bd1ab42..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import com.google.common.base.Preconditions;
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.PartitionMetadata;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Created by qianzhou on 3/26/15.
- */
-public class StreamingBootstrap {
-
-    private KylinConfig kylinConfig;
-    private StreamManager streamManager;
-    private IIManager iiManager;
-
-    public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
-        return new StreamingBootstrap(kylinConfig);
-    }
-
-    private StreamingBootstrap(KylinConfig kylinConfig) {
-        this.kylinConfig = kylinConfig;
-        this.streamManager = StreamManager.getInstance(kylinConfig);
-        this.iiManager = IIManager.getInstance(kylinConfig);
-    }
-
-    private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
-        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
-        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
-            return partitionMetadata.leader();
-        } else {
-            return null;
-        }
-    }
-
-    public void startStreaming(String streamingConf, int partitionId) throws Exception {
-        final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streamingConf);
-        Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streamingConf);
-        final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
-        Preconditions.checkNotNull(ii);
-
-        final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
-        Preconditions.checkState(leadBroker != null, "cannot find lead broker");
-        final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
-        long streamOffset = ii.getStreamOffsets().get(partitionId);
-        if (streamOffset < earliestOffset) {
-            streamOffset = earliestOffset;
-        }
-
-
-        KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), 0, streamOffset, kafkaConfig.getBrokers(), kafkaConfig) {
-            @Override
-            protected void consume(long offset, ByteBuffer payload) throws Exception {
-                byte[] bytes = new byte[payload.limit()];
-                payload.get(bytes);
-                getStreamQueue().put(new Stream(offset, bytes));
-            }
-        };
-        final IIDesc desc = ii.getDescriptor();
-        Executors.newSingleThreadExecutor().submit(consumer);
-        final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(), ii.getSegments().get(0).getStorageLocationIdentifier(), desc, partitionId);
-        task.setStreamParser(JsonStreamParser.instance);
-        final Future<?> future = Executors.newSingleThreadExecutor().submit(task);
-        future.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b5495ce/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
deleted file mode 100644
index dac8ce0..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by qianzhou on 3/26/15.
- */
-public class StreamingCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
-
-    public static void main(String[] args) {
-        try {
-            if (args.length < 2) {
-                printArgsError(args);
-                return;
-            }
-            if (args[0].equals("start")) {
-                String kafkaConfName = args[1];
-                StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).startStreaming(kafkaConfName, 0);
-            } else if (args.equals("stop")) {
-
-            } else {
-                printArgsError(args);
-            }
-        } catch (Exception e) {
-        }
-    }
-
-    private static void printArgsError(String[] args) {
-        logger.warn("invalid args:" + StringUtils.join(args, " "));
-    }
-
-}