You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:08:21 UTC

[25/26] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0970d76f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0970d76f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0970d76f

Branch: refs/heads/streaming
Commit: 0970d76fd4e9d6b92e684cc64c5e97676f986132
Parents: 1f2fb28
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 16:06:58 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 16:06:58 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/streaming/KafkaConfig.java | 151 +++++++++++++++++++
 .../apache/kylin/streaming/KafkaConsumer.java   | 133 ++++++++++++++++
 .../org/apache/kylin/streaming/Requester.java   | 146 ++++++++++++++++++
 .../java/org/apache/kylin/streaming/Stream.java |  57 +++++++
 .../apache/kylin/streaming/StreamBuilder.java   |  95 ++++++++++++
 .../org/apache/kylin/streaming/TopicMeta.java   |  63 ++++++++
 .../apache/kylin/streaming/kafka/Consumer.java  | 120 ---------------
 .../kylin/streaming/kafka/KafkaConfig.java      | 151 -------------------
 .../apache/kylin/streaming/kafka/Requester.java | 146 ------------------
 .../apache/kylin/streaming/kafka/Stream.java    |  57 -------
 .../kylin/streaming/kafka/StreamBuilder.java    |  94 ------------
 .../apache/kylin/streaming/kafka/TopicMeta.java |  63 --------
 .../apache/kylin/streaming/KafkaBaseTest.java   |  78 ++++++++++
 .../apache/kylin/streaming/KafkaConfigTest.java |  65 ++++++++
 .../kylin/streaming/KafkaConsumerTest.java      |  98 ++++++++++++
 .../apache/kylin/streaming/RequesterTest.java   |  71 +++++++++
 .../apache/kylin/streaming/TestProducer.java    | 114 ++++++++++++++
 .../kylin/streaming/kafka/KafkaBaseTest.java    |  77 ----------
 .../kylin/streaming/kafka/KafkaConfigTest.java  |  64 --------
 .../streaming/kafka/KafkaConsumerTest.java      | 101 -------------
 .../kylin/streaming/kafka/RequesterTest.java    |  70 ---------
 .../kylin/streaming/kafka/TestProducer.java     | 115 --------------
 22 files changed, 1071 insertions(+), 1058 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
new file mode 100644
index 0000000..7d0cd6b
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -0,0 +1,151 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfig {
+
+    private List<Broker> brokers;
+
+    private String zookeeper;
+
+    private String topic;
+
+    private int timeout;
+
+    private int maxReadCount;
+
+    private int bufferSize;
+
+    private int partitionId;
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getMaxReadCount() {
+        return maxReadCount;
+    }
+
+    public void setMaxReadCount(int maxReadCount) {
+        this.maxReadCount = maxReadCount;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public List<Broker> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(List<Broker> brokers) {
+        this.brokers = brokers;
+    }
+
+    public String getZookeeper() {
+        return zookeeper;
+    }
+
+    public void setZookeeper(String zookeeper) {
+        this.zookeeper = zookeeper;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public void setPartitionId(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public static KafkaConfig load(KafkaConfig config) {
+        KafkaConfig result = new KafkaConfig();
+        result.setBufferSize(config.getBufferSize());
+        result.setMaxReadCount(config.getMaxReadCount());
+        result.setTimeout(config.getTimeout());
+        result.setTopic(config.getTopic());
+        result.setZookeeper(config.getZookeeper());
+        result.setPartitionId(config.getPartitionId());
+        result.setBrokers(config.getBrokers());
+        return result;
+    }
+
+    public static KafkaConfig load(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        KafkaConfig result = new KafkaConfig();
+        result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
+        result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
+        result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
+        result.setTopic(properties.getProperty("topic"));
+        result.setZookeeper(properties.getProperty("zookeeper"));
+        result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
+
+        int id = 0;
+        List<Broker> brokers = Lists.newArrayList();
+        for (String str: properties.getProperty("brokers").split(",")) {
+            final String[] split = str.split(":");
+            final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
+            brokers.add(broker);
+        }
+        result.setBrokers(brokers);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
new file mode 100644
index 0000000..e45b6e4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -0,0 +1,133 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class KafkaConsumer implements Runnable {
+
+    private String topic;
+    private int partitionId;
+
+    private KafkaConfig kafkaConfig;
+    private List<Broker> replicaBrokers;
+    private AtomicLong offset = new AtomicLong();
+    private BlockingQueue<Stream> streamQueue;
+
+    private Logger logger;
+
+    public KafkaConsumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
+        this.topic = topic;
+        this.partitionId = partitionId;
+        this.kafkaConfig = kafkaConfig;
+        this.replicaBrokers = initialBrokers;
+        logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
+        streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
+    }
+
+    public BlockingQueue<Stream> getStreamQueue() {
+        return streamQueue;
+    }
+
+    private Broker getLeadBroker() {
+        final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
+        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+            replicaBrokers = partitionMetadata.replicas();
+            return partitionMetadata.leader();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            Broker leadBroker = getLeadBroker();
+            if (leadBroker == null) {
+                logger.warn("cannot find lead broker");
+            } else {
+                final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+                offset.set(lastOffset);
+            }
+            while (true) {
+                if (leadBroker == null) {
+                    leadBroker = getLeadBroker();
+                }
+                if (leadBroker == null) {
+                    logger.warn("cannot find lead broker");
+                    continue;
+                }
+
+                final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
+                if (fetchResponse.errorCode(topic, partitionId) != 0) {
+                    logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
+                    continue;
+                }
+                for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
+                    final ByteBuffer payload = messageAndOffset.message().payload();
+                    //TODO use ByteBuffer maybe
+                    byte[] bytes = new byte[payload.limit()];
+                    payload.get(bytes);
+                    logger.debug("get message offset:" + messageAndOffset.offset());
+                    try {
+                        streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
+                    } catch (InterruptedException e) {
+                        logger.error("error put streamQueue", e);
+                        break;
+                    }
+                    offset.incrementAndGet();
+                }
+            }
+        } catch (Exception e) {
+            logger.error("consumer has encountered an error", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
new file mode 100644
index 0000000..79332af
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
@@ -0,0 +1,146 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public final class Requester {
+
+    private static final Logger logger = LoggerFactory.getLogger(Requester.class);
+
+    public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
+        SimpleConsumer consumer;
+        for (Broker broker : kafkaConfig.getBrokers()) {
+            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+            List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
+            TopicMetadataRequest req = new TopicMetadataRequest(topics);
+            TopicMetadataResponse resp = consumer.send(req);
+            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+            if (topicMetadatas.size() != 1) {
+                break;
+            }
+            final TopicMetadata topicMetadata = topicMetadatas.get(0);
+            if (topicMetadata.errorCode() != 0) {
+                break;
+            }
+            List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
+                @Nullable
+                @Override
+                public Integer apply(PartitionMetadata partitionMetadata) {
+                    return partitionMetadata.partitionId();
+                }
+            });
+            return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+        }
+        logger.debug("cannot find topic:" + kafkaConfig.getTopic());
+        return null;
+    }
+
+    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
+        SimpleConsumer consumer;
+        for (Broker broker : brokers) {
+            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+            List<String> topics = Collections.singletonList(topic);
+            TopicMetadataRequest req = new TopicMetadataRequest(topics);
+            TopicMetadataResponse resp = consumer.send(req);
+            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+            if (topicMetadatas.size() != 1) {
+                logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
+                break;
+            }
+            final TopicMetadata topicMetadata = topicMetadatas.get(0);
+            if (topicMetadata.errorCode() != 0) {
+                logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
+                break;
+            }
+            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
+                if (partitionMetadata.partitionId() == partitionId) {
+                    return partitionMetadata;
+                }
+            }
+        }
+        logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
+        return null;
+    }
+
+    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
+        final String clientName = "client_" + topic + "_" + partitionId;
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+        kafka.api.FetchRequest req = new FetchRequestBuilder()
+                .clientId(clientName)
+                .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+                .build();
+        return consumer.fetch(req);
+    }
+
+    public static long getLastOffset(String topic, int partitionId,
+                                     long whichTime, Broker broker, KafkaConfig kafkaConfig) {
+        String clientName = "client_" + topic + "_" + partitionId;
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        OffsetResponse response = consumer.getOffsetsBefore(request);
+
+        if (response.hasError()) {
+            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+            return 0;
+        }
+        long[] offsets = response.offsets(topic, partitionId);
+        return offsets[0];
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
new file mode 100644
index 0000000..3cb43b6
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class Stream {
+
+    private long timestamp;
+    private byte[] rawData;
+
+    public Stream(long timestamp, byte[] rawData) {
+        this.timestamp = timestamp;
+        this.rawData = rawData;
+    }
+
+    public byte[] getRawData() {
+        return rawData;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
new file mode 100644
index 0000000..005c255
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -0,0 +1,95 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by qianzhou on 2/17/15.
+ */
+public abstract class StreamBuilder implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+
+    private static final int BATCH_BUILD_BYTES_THRESHOLD = 64 * 1024;
+    private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 5 * 60 * 1000;
+
+    private BlockingQueue<Stream> streamQueue;
+    private long lastBuildTime = System.currentTimeMillis();
+    private int bytesTotal = 0;
+
+    public StreamBuilder(BlockingQueue<Stream> streamQueue) {
+        this.streamQueue = streamQueue;
+    }
+
+    protected abstract void build(List<Stream> streamsToBuild);
+
+    private void buildStream(List<Stream> streams) {
+        build(streams);
+        clearCounter();
+    }
+
+    private void clearCounter() {
+        lastBuildTime = System.currentTimeMillis();
+        bytesTotal = 0;
+    }
+
+    @Override
+    public void run() {
+        try {
+            List<Stream> streamToBuild = Lists.newArrayList();
+            clearCounter();
+            while (true) {
+                final Stream stream = streamQueue.take();
+                streamToBuild.add(stream);
+                bytesTotal += stream.getRawData().length;
+                if (bytesTotal >= BATCH_BUILD_BYTES_THRESHOLD) {
+                    buildStream(streamToBuild);
+                } else if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
+                    buildStream(streamToBuild);
+                } else {
+                    continue;
+                }
+            }
+        } catch (InterruptedException e) {
+            logger.error("StreamBuilder has been interrupted", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
new file mode 100644
index 0000000..b93d589
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
+ *
+ * Created by qianzhou on 2/15/15.
+ */
+public class TopicMeta {
+
+    private final String name;
+
+    private final List<Integer> partitionIds;
+
+    public TopicMeta(String name, List<Integer> partitionIds) {
+        this.name = name;
+        this.partitionIds = Collections.unmodifiableList(partitionIds);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public List<Integer> getPartitionIds() {
+        return partitionIds;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
deleted file mode 100644
index 074ef01..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class Consumer implements Runnable {
-
-    private String topic;
-    private int partitionId;
-
-    private KafkaConfig kafkaConfig;
-    private List<Broker> replicaBrokers;
-    private AtomicLong offset = new AtomicLong();
-    private BlockingQueue<Stream> streamQueue;
-
-    private Logger logger;
-
-    public Consumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
-        this.topic = topic;
-        this.partitionId = partitionId;
-        this.kafkaConfig = kafkaConfig;
-        this.replicaBrokers = initialBrokers;
-        logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
-        streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
-    }
-
-    public BlockingQueue<Stream> getStreamQueue() {
-        return streamQueue;
-    }
-
-    private Broker getLeadBroker() {
-        final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
-        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
-            replicaBrokers = partitionMetadata.replicas();
-            return partitionMetadata.leader();
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public void run() {
-        while (true) {
-            final Broker leadBroker = getLeadBroker();
-            if (leadBroker == null) {
-                logger.warn("cannot find lead broker");
-                continue;
-            }
-            final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
-            offset.set(lastOffset);
-            final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
-            if (fetchResponse.errorCode(topic, partitionId) != 0) {
-                logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
-                continue;
-            }
-            for (MessageAndOffset messageAndOffset: fetchResponse.messageSet(topic, partitionId)) {
-                final ByteBuffer payload = messageAndOffset.message().payload();
-                //TODO use ByteBuffer maybe
-                byte[] bytes = new byte[payload.limit()];
-                payload.get(bytes);
-                logger.debug("get message offset:" + messageAndOffset.offset());
-                try {
-                    streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
-                } catch (InterruptedException e) {
-                    logger.error("error put streamQueue", e);
-                }
-                offset.incrementAndGet();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
deleted file mode 100644
index 82513eb..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 3/2/15.
- */
-public class KafkaConfig {
-
-    private List<Broker> brokers;
-
-    private String zookeeper;
-
-    private String topic;
-
-    private int timeout;
-
-    private int maxReadCount;
-
-    private int bufferSize;
-
-    private int partitionId;
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
-    }
-
-    public int getMaxReadCount() {
-        return maxReadCount;
-    }
-
-    public void setMaxReadCount(int maxReadCount) {
-        this.maxReadCount = maxReadCount;
-    }
-
-    public int getBufferSize() {
-        return bufferSize;
-    }
-
-    public void setBufferSize(int bufferSize) {
-        this.bufferSize = bufferSize;
-    }
-
-    public List<Broker> getBrokers() {
-        return brokers;
-    }
-
-    public void setBrokers(List<Broker> brokers) {
-        this.brokers = brokers;
-    }
-
-    public String getZookeeper() {
-        return zookeeper;
-    }
-
-    public void setZookeeper(String zookeeper) {
-        this.zookeeper = zookeeper;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    public void setPartitionId(int partitionId) {
-        this.partitionId = partitionId;
-    }
-
-    public static KafkaConfig load(KafkaConfig config) {
-        KafkaConfig result = new KafkaConfig();
-        result.setBufferSize(config.getBufferSize());
-        result.setMaxReadCount(config.getMaxReadCount());
-        result.setTimeout(config.getTimeout());
-        result.setTopic(config.getTopic());
-        result.setZookeeper(config.getZookeeper());
-        result.setPartitionId(config.getPartitionId());
-        result.setBrokers(config.getBrokers());
-        return result;
-    }
-
-    public static KafkaConfig load(Properties properties) {
-        Preconditions.checkNotNull(properties);
-        KafkaConfig result = new KafkaConfig();
-        result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
-        result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
-        result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
-        result.setTopic(properties.getProperty("topic"));
-        result.setZookeeper(properties.getProperty("zookeeper"));
-        result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
-
-        int id = 0;
-        List<Broker> brokers = Lists.newArrayList();
-        for (String str: properties.getProperty("brokers").split(",")) {
-            final String[] split = str.split(":");
-            final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
-            brokers.add(broker);
-        }
-        result.setBrokers(brokers);
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
deleted file mode 100644
index 8811695..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.*;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public final class Requester {
-
-    private static final Logger logger = LoggerFactory.getLogger(Requester.class);
-
-    public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
-        SimpleConsumer consumer;
-        for (Broker broker : kafkaConfig.getBrokers()) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp = consumer.send(req);
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                break;
-            }
-            List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
-                @Nullable
-                @Override
-                public Integer apply(PartitionMetadata partitionMetadata) {
-                    return partitionMetadata.partitionId();
-                }
-            });
-            return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
-        }
-        logger.debug("cannot find topic:" + kafkaConfig.getTopic());
-        return null;
-    }
-
-    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
-        SimpleConsumer consumer;
-        for (Broker broker : brokers) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(topic);
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp = consumer.send(req);
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
-                break;
-            }
-            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
-                if (partitionMetadata.partitionId() == partitionId) {
-                    return partitionMetadata;
-                }
-            }
-        }
-        logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
-        return null;
-    }
-
-    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
-        final String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
-        kafka.api.FetchRequest req = new FetchRequestBuilder()
-                .clientId(clientName)
-                .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
-                .build();
-        return consumer.fetch(req);
-    }
-
-    public static long getLastOffset(String topic, int partitionId,
-                                     long whichTime, Broker broker, KafkaConfig kafkaConfig) {
-        String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
-                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
-        OffsetResponse response = consumer.getOffsetsBefore(request);
-
-        if (response.hasError()) {
-            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
-            return 0;
-        }
-        long[] offsets = response.offsets(topic, partitionId);
-        return offsets[0];
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
deleted file mode 100644
index 2a0ede4..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class Stream {
-
-    private long timestamp;
-    private byte[] rawData;
-
-    public Stream(long timestamp, byte[] rawData) {
-        this.timestamp = timestamp;
-        this.rawData = rawData;
-    }
-
-    public byte[] getRawData() {
-        return rawData;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
deleted file mode 100644
index 145b5c8..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Created by qianzhou on 2/17/15.
- */
-public abstract class StreamBuilder implements Runnable {
-
-    private List<BlockingQueue<Stream>> streamQueues;
-    private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
-    private final int batchBuildCount;
-
-    public StreamBuilder(List<BlockingQueue<Stream>> streamQueues, int batchBuildCount) {
-        this.streamQueues = streamQueues;
-        this.batchBuildCount = batchBuildCount;
-    }
-
-
-    private int getEarliestStreamIndex(Stream[] streamHead) {
-        long ts = Long.MAX_VALUE;
-        int idx = 0;
-        for (int i = 0; i < streamHead.length; i++) {
-            if (streamHead[i].getTimestamp() < ts) {
-                ts = streamHead[i].getTimestamp();
-                idx = i;
-            }
-        }
-        return idx;
-    }
-
-    protected abstract void build(List<Stream> streamsToBuild);
-
-    @Override
-    public void run() {
-        try {
-            Stream[] streamHead = new Stream[streamQueues.size()];
-            for (int i = 0; i < streamQueues.size(); i++) {
-                streamHead[i] = streamQueues.get(i).take();
-            }
-            List<Stream> streamToBuild = Lists.newArrayListWithCapacity(batchBuildCount);
-            while (true) {
-                if (streamToBuild.size() >= batchBuildCount) {
-                    build(streamToBuild);
-                    streamToBuild.clear();
-                }
-                int idx = getEarliestStreamIndex(streamHead);
-                streamToBuild.add(streamHead[idx]);
-                streamHead[idx] = streamQueues.get(idx).take();
-            }
-        } catch (InterruptedException e) {
-            logger.error("", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
deleted file mode 100644
index 7822797..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
- *
- * Created by qianzhou on 2/15/15.
- */
-public class TopicMeta {
-
-    private final String name;
-
-    private final List<Integer> partitionIds;
-
-    public TopicMeta(String name, List<Integer> partitionIds) {
-        this.name = name;
-        this.partitionIds = Collections.unmodifiableList(partitionIds);
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public List<Integer> getPartitionIds() {
-        return partitionIds;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
new file mode 100644
index 0000000..89277d2
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
@@ -0,0 +1,78 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kylin.streaming.KafkaConfig;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public abstract class KafkaBaseTest {
+
+    protected static final Logger logger = LoggerFactory.getLogger("kafka test");
+
+    protected static ZkClient zkClient;
+
+    protected static KafkaConfig kafkaConfig;
+
+    @BeforeClass
+    public static void beforeClass() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        kafkaConfig = KafkaConfig.load(properties);
+
+        zkClient = new ZkClient(kafkaConfig.getZookeeper());
+    }
+
+
+    public static void createTopic(String topic, int partition, int replica) {
+        try {
+            AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
+        } catch (TopicExistsException e) {
+            logger.info(e.getMessage());
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
new file mode 100644
index 0000000..5612763
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.streaming.KafkaConfig;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfigTest {
+
+    @Test
+    public void test() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        KafkaConfig config = KafkaConfig.load(properties);
+        assertEquals(1000, config.getMaxReadCount());
+        assertEquals(65536, config.getBufferSize());
+        assertEquals(60000, config.getTimeout());
+        assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
+        assertEquals("kafka_stream_test", config.getTopic());
+        assertEquals(0, config.getPartitionId());
+        assertEquals(1, config.getBrokers().size());
+        assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
new file mode 100644
index 0000000..91e06fc
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class KafkaConsumerTest extends KafkaBaseTest {
+
+    private TestProducer producer;
+
+    private static final int TOTAL_SEND_COUNT = 100;
+
+    @Before
+    public void before() throws IOException {
+        producer = new TestProducer(TOTAL_SEND_COUNT);
+        producer.start();
+    }
+
+    @After
+    public void after() {
+        producer.stop();
+    }
+
+    private void waitForProducerToStop(TestProducer producer) {
+        while (!producer.isStopped()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void test() throws InterruptedException {
+        final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
+        final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+        List<BlockingQueue<Stream>> queues = Lists.newArrayList();
+        for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
+            KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+            queues.add(consumer.getStreamQueue());
+            executorService.execute(consumer);
+        }
+        waitForProducerToStop(producer);
+        int count = 0;
+        for (BlockingQueue<Stream> queue : queues) {
+            count += queue.size();
+        }
+        //since there will be historical data
+        assertTrue(count >= TOTAL_SEND_COUNT);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
new file mode 100644
index 0000000..d54f22c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.streaming.KafkaConfig;
+import org.apache.kylin.streaming.Requester;
+import org.apache.kylin.streaming.TopicMeta;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class RequesterTest extends KafkaBaseTest {
+
+    private static final String NON_EXISTED_TOPIC = "non_existent_topic";
+
+
+
+    @AfterClass
+    public static void afterClass() {
+    }
+
+    @Test
+    public void testTopicMeta() throws Exception {
+        TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
+        assertNotNull(kafkaTopicMeta);
+        assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
+        assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
+
+        KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
+        anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
+
+        kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
+        assertTrue(kafkaTopicMeta == null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
new file mode 100644
index 0000000..368904c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
@@ -0,0 +1,114 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import kafka.cluster.Broker;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TestProducer {
+
+    private volatile boolean stopped = false;
+
+    private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
+
+    private final int sendCount;
+
+    public TestProducer(int sendCount) {
+        this.sendCount = sendCount;
+    }
+
+    public void start() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
+
+        Properties props = new Properties();
+        props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+            @Nullable
+            @Override
+            public String apply(@Nullable Broker broker) {
+                return broker.getConnectionString();
+            }
+        }), ","));
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+        ProducerConfig config = new ProducerConfig(props);
+        final Producer<String, String> producer = new Producer<String, String>(config);
+
+        final Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int count = 0;
+                while (!stopped && count < sendCount) {
+                    final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
+                    producer.send(message);
+                    count++;
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                logger.debug("totally " + count +" messages have been sent");
+                stopped = true;
+
+            }
+        });
+        thread.setDaemon(false);
+        thread.start();
+    }
+
+    public boolean isStopped() {
+        return stopped;
+    }
+
+    public void stop() {
+        stopped = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
deleted file mode 100644
index 537a15d..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.TopicExistsException;
-import org.I0Itec.zkclient.ZkClient;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public abstract class KafkaBaseTest {
-
-    protected static final Logger logger = LoggerFactory.getLogger("kafka test");
-
-    protected static ZkClient zkClient;
-
-    protected static KafkaConfig kafkaConfig;
-
-    @BeforeClass
-    public static void beforeClass() throws IOException {
-        final Properties properties = new Properties();
-        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
-        kafkaConfig = KafkaConfig.load(properties);
-
-        zkClient = new ZkClient(kafkaConfig.getZookeeper());
-    }
-
-
-    public static void createTopic(String topic, int partition, int replica) {
-        try {
-            AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
-        } catch (TopicExistsException e) {
-            logger.info(e.getMessage());
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
deleted file mode 100644
index 3c9bd87..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Created by qianzhou on 3/2/15.
- */
-public class KafkaConfigTest {
-
-    @Test
-    public void test() throws IOException {
-        final Properties properties = new Properties();
-        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
-        KafkaConfig config = KafkaConfig.load(properties);
-        assertEquals(1000, config.getMaxReadCount());
-        assertEquals(65536, config.getBufferSize());
-        assertEquals(60000, config.getTimeout());
-        assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
-        assertEquals("kafka_stream_test", config.getTopic());
-        assertEquals(0, config.getPartitionId());
-        assertEquals(1, config.getBrokers().size());
-        assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
deleted file mode 100644
index 4449d37..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import kafka.consumer.ConsumerConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class KafkaConsumerTest extends KafkaBaseTest {
-
-    private TestProducer producer;
-
-    private static final int TOTAL_SEND_COUNT = 100;
-
-    @Before
-    public void before() throws IOException {
-        producer = new TestProducer(TOTAL_SEND_COUNT);
-        producer.start();
-    }
-
-    @After
-    public void after() {
-        producer.stop();
-    }
-
-    private void waitForProducerToStop(TestProducer producer) {
-        while (!producer.isStopped()) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    @Test
-    public void test() throws InterruptedException {
-        final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
-        final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
-        List<BlockingQueue<Stream>> queues = Lists.newArrayList();
-        for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
-            Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
-            queues.add(consumer.getStreamQueue());
-            executorService.execute(consumer);
-        }
-        waitForProducerToStop(producer);
-        int count = 0;
-        for (BlockingQueue<Stream> queue : queues) {
-            count += queue.size();
-        }
-        //since there will be historical data
-        assertTrue(count >= TOTAL_SEND_COUNT);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
deleted file mode 100644
index 694c1fd..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.*;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class RequesterTest extends KafkaBaseTest {
-
-    private static final String NON_EXISTED_TOPIC = "non_existent_topic";
-
-
-
-    @AfterClass
-    public static void afterClass() {
-    }
-
-    @Test
-    public void testTopicMeta() throws Exception {
-        TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
-        assertNotNull(kafkaTopicMeta);
-        assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
-        assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
-
-        KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
-        anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
-
-        kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
-        assertTrue(kafkaTopicMeta == null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
deleted file mode 100644
index cd3b166..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TestProducer {
-
-    private volatile boolean stopped = false;
-
-    private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
-
-    private final int sendCount;
-
-    public TestProducer(int sendCount) {
-        this.sendCount = sendCount;
-    }
-
-    public void start() throws IOException {
-        final Properties properties = new Properties();
-        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
-        final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
-
-        Properties props = new Properties();
-        props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
-            @Nullable
-            @Override
-            public String apply(@Nullable Broker broker) {
-                return broker.getConnectionString();
-            }
-        }), ","));
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("request.required.acks", "1");
-        ProducerConfig config = new ProducerConfig(props);
-        final Producer<String, String> producer = new Producer<String, String>(config);
-
-        final Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                int count = 0;
-                while (!stopped && count < sendCount) {
-                    final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
-                    producer.send(message);
-                    count++;
-                    try {
-                        Thread.sleep(100);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-                logger.debug("totally " + count +" messages have been sent");
-                stopped = true;
-
-            }
-        });
-        thread.setDaemon(false);
-        thread.start();
-    }
-
-    public boolean isStopped() {
-        return stopped;
-    }
-
-    public void stop() {
-        stopped = true;
-    }
-}