You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/27 07:28:38 UTC

[06/10] incubator-kylin git commit: Kafka Consumer

Kafka Consumer


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/437a31ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/437a31ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/437a31ce

Branch: refs/heads/streaming
Commit: 437a31cef587ee5bce9bc89a2872d29269b639ef
Parents: 4e451be
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Feb 16 17:45:54 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Feb 16 17:45:54 2015 +0800

----------------------------------------------------------------------
 pom.xml                                         |   1 +
 streaming/pom.xml                               |  35 ++++++
 .../apache/kylin/streaming/kafka/Consumer.java  | 117 ++++++++++++++++++
 .../kylin/streaming/kafka/ConsumerConfig.java   |  72 +++++++++++
 .../apache/kylin/streaming/kafka/Requester.java | 121 +++++++++++++++++++
 .../apache/kylin/streaming/kafka/Stream.java    |  57 +++++++++
 .../kylin/streaming/kafka/TopicConfig.java      |  66 ++++++++++
 .../apache/kylin/streaming/kafka/TopicMeta.java |  63 ++++++++++
 .../kylin/streaming/kafka/KafkaBaseTest.java    |  70 +++++++++++
 .../streaming/kafka/KafkaConsumerTest.java      | 106 ++++++++++++++++
 .../kylin/streaming/kafka/RequesterTest.java    |  84 +++++++++++++
 .../kylin/streaming/kafka/TestConstants.java    |  48 ++++++++
 .../kylin/streaming/kafka/TestProducer.java     |  98 +++++++++++++++
 streaming/src/test/resources/log4j.xml          |  51 ++++++++
 14 files changed, 989 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0a85ba2..299294a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -553,6 +553,7 @@
         <module>server</module>
         <module>jdbc</module>
         <module>invertedindex</module>
+        <module>streaming</module>
     </modules>
 
     <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644
index 0000000..b9eb5a6
--- /dev/null
+++ b/streaming/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>0.7.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-streaming</artifactId>
+    <name>Kylin:Streaming</name>
+
+    <dependencies>
+
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.8.1</version>
+        </dependency>
+
+    </dependencies>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
new file mode 100644
index 0000000..eaee2a1
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -0,0 +1,117 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class Consumer implements Runnable {
+
+    private String topic;
+    private int partitionId;
+
+    private ConsumerConfig consumerConfig;
+    private List<Broker> replicaBrokers;
+    private AtomicLong offset = new AtomicLong();
+    private BlockingQueue<Stream> streamQueue;
+
+    private Logger logger;
+
+    public Consumer(String topic, int partitionId, List<Broker> initialBrokers, ConsumerConfig consumerConfig) {
+        this.topic = topic;
+        this.partitionId = partitionId;
+        this.consumerConfig = consumerConfig;
+        this.replicaBrokers = initialBrokers;
+        logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
+        streamQueue = new ArrayBlockingQueue<Stream>(consumerConfig.getMaxReadCount());
+    }
+
+    public BlockingQueue<Stream> getStreamQueue() {
+        return streamQueue;
+    }
+
+    private Broker getLeadBroker() {
+        final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, consumerConfig);
+        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+            replicaBrokers = partitionMetadata.replicas();
+            return partitionMetadata.leader();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            final Broker leadBroker = getLeadBroker();
+            if (leadBroker == null) {
+                logger.warn("cannot find lead broker");
+                continue;
+            }
+            final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
+            if (fetchResponse.errorCode(topic, partitionId) != 0) {
+                logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
+                continue;
+            }
+            for (MessageAndOffset messageAndOffset: fetchResponse.messageSet(topic, partitionId)) {
+                final ByteBuffer payload = messageAndOffset.message().payload();
+                //TODO use ByteBuffer maybe
+                byte[] bytes = new byte[payload.limit()];
+                payload.get(bytes);
+                logger.debug("get message offset:" + messageAndOffset.offset());
+                try {
+                    streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
+                } catch (InterruptedException e) {
+                    logger.error("error put streamQueue", e);
+                }
+                offset.incrementAndGet();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
new file mode 100644
index 0000000..3bdbd13
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class ConsumerConfig {
+
+    private int timeout;
+
+    private int maxReadCount;
+
+    private int bufferSize;
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getMaxReadCount() {
+        return maxReadCount;
+    }
+
+    public void setMaxReadCount(int maxReadCount) {
+        this.maxReadCount = maxReadCount;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
new file mode 100644
index 0000000..1f14a47
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -0,0 +1,121 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import kafka.api.FetchRequestBuilder;
+import kafka.cluster.Broker;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public final class Requester {
+
+    private static final Logger logger = LoggerFactory.getLogger(Requester.class);
+
+    public static TopicMeta getKafkaTopicMeta(TopicConfig topicConfig, ConsumerConfig consumerConfig) {
+        SimpleConsumer consumer;
+        for (Broker broker : topicConfig.getBrokers()) {
+            consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
+            List<String> topics = Collections.singletonList(topicConfig.getTopic());
+            TopicMetadataRequest req = new TopicMetadataRequest(topics);
+            TopicMetadataResponse resp = consumer.send(req);
+            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+            if (topicMetadatas.size() != 1) {
+                break;
+            }
+            final TopicMetadata topicMetadata = topicMetadatas.get(0);
+            if (topicMetadata.errorCode() != 0) {
+                break;
+            }
+            List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
+                @Nullable
+                @Override
+                public Integer apply(PartitionMetadata partitionMetadata) {
+                    return partitionMetadata.partitionId();
+                }
+            });
+            return new TopicMeta(topicConfig.getTopic(), partitionIds);
+        }
+        logger.debug("cannot find topic:" + topicConfig.getTopic());
+        return null;
+    }
+
+    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, ConsumerConfig consumerConfig) {
+        SimpleConsumer consumer;
+        for (Broker broker : brokers) {
+            consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
+            List<String> topics = Collections.singletonList(topic);
+            TopicMetadataRequest req = new TopicMetadataRequest(topics);
+            TopicMetadataResponse resp = consumer.send(req);
+            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+            if (topicMetadatas.size() != 1) {
+                logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
+                break;
+            }
+            final TopicMetadata topicMetadata = topicMetadatas.get(0);
+            if (topicMetadata.errorCode() != 0) {
+                logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
+                break;
+            }
+            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
+                if (partitionMetadata.partitionId() == partitionId) {
+                    return partitionMetadata;
+                }
+            }
+        }
+        logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
+        return null;
+    }
+
+    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, ConsumerConfig consumerConfig) {
+        final String clientName = "client_" + topic + "_" + partitionId;
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+        kafka.api.FetchRequest req = new FetchRequestBuilder()
+                .clientId(clientName)
+                .addFetch(topic, partitionId, offset, consumerConfig.getTimeout()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+                .build();
+        return consumer.fetch(req);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
new file mode 100644
index 0000000..2a0ede4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class Stream {
+
+    private long timestamp;
+    private byte[] rawData;
+
+    public Stream(long timestamp, byte[] rawData) {
+        this.timestamp = timestamp;
+        this.rawData = rawData;
+    }
+
+    public byte[] getRawData() {
+        return rawData;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
new file mode 100644
index 0000000..4aa9671
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
@@ -0,0 +1,66 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.cluster.Broker;
+
+import java.util.List;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TopicConfig {
+
+    private String topic;
+
+    private List<Broker> brokers;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public List<Broker> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(List<Broker> brokers) {
+        this.brokers = brokers;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
new file mode 100644
index 0000000..7822797
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
+ *
+ * Created by qianzhou on 2/15/15.
+ */
+public class TopicMeta {
+
+    private final String name;
+
+    private final List<Integer> partitionIds;
+
+    public TopicMeta(String name, List<Integer> partitionIds) {
+        this.name = name;
+        this.partitionIds = Collections.unmodifiableList(partitionIds);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public List<Integer> getPartitionIds() {
+        return partitionIds;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
new file mode 100644
index 0000000..a1f9b87
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
@@ -0,0 +1,70 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import org.I0Itec.zkclient.ZkClient;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public abstract class KafkaBaseTest {
+
+    protected static final Logger logger = LoggerFactory.getLogger("kafka test");
+
+    protected static ZkClient zkClient;
+
+    @BeforeClass
+    public static void beforeClass() {
+        zkClient = new ZkClient(TestConstants.ZOOKEEPER);
+    }
+
+
+    public static void createTopic(String topic, int partition, int replica) {
+        try {
+            AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
+        } catch (TopicExistsException e) {
+            logger.info(e.getMessage());
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
new file mode 100644
index 0000000..d89695d
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
@@ -0,0 +1,106 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class KafkaConsumerTest extends KafkaBaseTest {
+
+    private TestProducer producer;
+
+    private static final int TOTAL_SEND_COUNT = 100;
+
+    @Before
+    public void before() {
+        producer = new TestProducer(TOTAL_SEND_COUNT);
+        producer.start();
+    }
+
+    @After
+    public void after() {
+        producer.stop();
+    }
+
+    private void waitForProducerToStop(TestProducer producer) {
+        while (!producer.isStopped()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void test() throws InterruptedException {
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopic(TestConstants.TOPIC);
+        topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
+        ConsumerConfig consumerConfig = new ConsumerConfig();
+        consumerConfig.setBufferSize(64 * 1024);
+        consumerConfig.setMaxReadCount(1000);
+        consumerConfig.setTimeout(60 * 1000);
+        final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+        final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+        List<BlockingQueue<Stream>> queues = Lists.newArrayList();
+        for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
+            Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, Lists.asList(TestConstants.BROKER, new Broker[0]), consumerConfig);
+            queues.add(consumer.getStreamQueue());
+            executorService.execute(consumer);
+        }
+        waitForProducerToStop(producer);
+        int count = 0;
+        for (BlockingQueue<Stream> queue : queues) {
+            count += queue.size();
+        }
+        //since there will be historical data
+        assertTrue(count >= TOTAL_SEND_COUNT);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
new file mode 100644
index 0000000..fd8cc63
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
@@ -0,0 +1,84 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class RequesterTest extends KafkaBaseTest {
+
+    private static TopicConfig topicConfig;
+    private static ConsumerConfig consumerConfig;
+
+    private static final String UNEXISTED_TOPIC = "unexist_topic";
+
+    @BeforeClass
+    public static void beforeClass() {
+        topicConfig = new TopicConfig();
+        topicConfig.setTopic(TestConstants.TOPIC);
+        topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
+        consumerConfig = new ConsumerConfig();
+        consumerConfig.setBufferSize(64 * 1024);
+        consumerConfig.setMaxReadCount(1000);
+        consumerConfig.setTimeout(60 * 1000);
+    }
+
+    @AfterClass
+    public static void afterClass() {
+    }
+
+    @Test
+    public void testTopicMeta() throws Exception {
+        TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+        assertNotNull(kafkaTopicMeta);
+        assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
+        assertEquals(topicConfig.getTopic(), kafkaTopicMeta.getName());
+
+        TopicConfig anotherTopicConfig = new TopicConfig();
+        anotherTopicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
+        anotherTopicConfig.setTopic(UNEXISTED_TOPIC);
+
+        kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig, consumerConfig);
+        assertTrue(kafkaTopicMeta == null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
new file mode 100644
index 0000000..85a6ba3
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
@@ -0,0 +1,48 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.cluster.Broker;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TestConstants {
+
+    public static final String TOPIC = "kafka_stream_test";
+    public static final String ZOOKEEPER = "sandbox.hortonworks.com:2181";
+    public static final Broker BROKER = new Broker(0, "sandbox.hortonworks.com", 6667);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
new file mode 100644
index 0000000..54ad583
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TestProducer {
+
+    private volatile boolean stopped = false;
+
+    private static final Logger logger = LoggerFactory.getLogger(TestConstants.class);
+
+    private final int sendCount;
+
+    public TestProducer(int sendCount) {
+        this.sendCount = sendCount;
+    }
+
+    public void start() {
+        Properties props = new Properties();
+        props.put("metadata.broker.list", TestConstants.BROKER.getConnectionString());
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+        ProducerConfig config = new ProducerConfig(props);
+        final Producer<String, String> producer = new Producer<String, String>(config);
+
+        final Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int count = 0;
+                while (!stopped && count < sendCount) {
+                    final KeyedMessage<String, String> message = new KeyedMessage<>(TestConstants.TOPIC, "current time is:" + System.currentTimeMillis());
+                    producer.send(message);
+                    count++;
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                logger.debug("totally " + count +" messages have been sent");
+                stopped = true;
+
+            }
+        });
+        thread.setDaemon(false);
+        thread.start();
+    }
+
+    public boolean isStopped() {
+        return stopped;
+    }
+
+    public void stop() {
+        stopped = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/streaming/src/test/resources/log4j.xml b/streaming/src/test/resources/log4j.xml
new file mode 100644
index 0000000..a0afa96
--- /dev/null
+++ b/streaming/src/test/resources/log4j.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ /*
+  ~
+  ~ * Licensed to the Apache Software Foundation (ASF) under one or more
+  ~
+  ~ * contributor license agreements. See the NOTICE file distributed with
+  ~
+  ~ * this work for additional information regarding copyright ownership.
+  ~
+  ~ * The ASF licenses this file to You under the Apache License, Version 2.0
+  ~
+  ~ * (the "License"); you may not use this file except in compliance with
+  ~
+  ~ * the License. You may obtain a copy of the License at
+  ~
+  ~ *
+  ~
+  ~ * http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ *
+  ~
+  ~ * Unless required by applicable law or agreed to in writing, software
+  ~
+  ~ * distributed under the License is distributed on an "AS IS" BASIS,
+  ~
+  ~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~
+  ~ * See the License for the specific language governing permissions and
+  ~
+  ~ * limitations under the License.
+  ~
+  ~ */
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration debug="true"
+                     xmlns:log4j='http://jakarta.apache.org/log4j/'>
+
+    <appender name="consoleAppender" class="org.apache.log4j.ConsoleAppender">
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d{dd MMM yyyy HH:mm:ss} %5p %c{1} - %m%n"/>
+        </layout>
+    </appender>
+
+    <root>
+        <level value="INFO"/>
+        <appender-ref ref="consoleAppender"/>
+    </root>
+
+</log4j:configuration>
\ No newline at end of file