You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/27 07:28:38 UTC
[06/10] incubator-kylin git commit: Kafka Consumer
Kafka Consumer
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/437a31ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/437a31ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/437a31ce
Branch: refs/heads/streaming
Commit: 437a31cef587ee5bce9bc89a2872d29269b639ef
Parents: 4e451be
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Feb 16 17:45:54 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Feb 16 17:45:54 2015 +0800
----------------------------------------------------------------------
pom.xml | 1 +
streaming/pom.xml | 35 ++++++
.../apache/kylin/streaming/kafka/Consumer.java | 117 ++++++++++++++++++
.../kylin/streaming/kafka/ConsumerConfig.java | 72 +++++++++++
.../apache/kylin/streaming/kafka/Requester.java | 121 +++++++++++++++++++
.../apache/kylin/streaming/kafka/Stream.java | 57 +++++++++
.../kylin/streaming/kafka/TopicConfig.java | 66 ++++++++++
.../apache/kylin/streaming/kafka/TopicMeta.java | 63 ++++++++++
.../kylin/streaming/kafka/KafkaBaseTest.java | 70 +++++++++++
.../streaming/kafka/KafkaConsumerTest.java | 106 ++++++++++++++++
.../kylin/streaming/kafka/RequesterTest.java | 84 +++++++++++++
.../kylin/streaming/kafka/TestConstants.java | 48 ++++++++
.../kylin/streaming/kafka/TestProducer.java | 98 +++++++++++++++
streaming/src/test/resources/log4j.xml | 51 ++++++++
14 files changed, 989 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0a85ba2..299294a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -553,6 +553,7 @@
<module>server</module>
<module>jdbc</module>
<module>invertedindex</module>
+ <module>streaming</module>
</modules>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644
index 0000000..b9eb5a6
--- /dev/null
+++ b/streaming/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>kylin</artifactId>
+ <groupId>org.apache.kylin</groupId>
+ <version>0.7.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kylin-streaming</artifactId>
+ <name>Kylin:Streaming</name>
+
+ <dependencies>
+
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.8.1</version>
+ </dependency>
+
+ </dependencies>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
new file mode 100644
index 0000000..eaee2a1
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -0,0 +1,117 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class Consumer implements Runnable {
+
+ private String topic;
+ private int partitionId;
+
+ private ConsumerConfig consumerConfig;
+ private List<Broker> replicaBrokers;
+ private AtomicLong offset = new AtomicLong();
+ private BlockingQueue<Stream> streamQueue;
+
+ private Logger logger;
+
+ public Consumer(String topic, int partitionId, List<Broker> initialBrokers, ConsumerConfig consumerConfig) {
+ this.topic = topic;
+ this.partitionId = partitionId;
+ this.consumerConfig = consumerConfig;
+ this.replicaBrokers = initialBrokers;
+ logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
+ streamQueue = new ArrayBlockingQueue<Stream>(consumerConfig.getMaxReadCount());
+ }
+
+ public BlockingQueue<Stream> getStreamQueue() {
+ return streamQueue;
+ }
+
+ private Broker getLeadBroker() {
+ final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, consumerConfig);
+ if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ replicaBrokers = partitionMetadata.replicas();
+ return partitionMetadata.leader();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ final Broker leadBroker = getLeadBroker();
+ if (leadBroker == null) {
+ logger.warn("cannot find lead broker");
+ continue;
+ }
+ final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
+ if (fetchResponse.errorCode(topic, partitionId) != 0) {
+ logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
+ continue;
+ }
+ for (MessageAndOffset messageAndOffset: fetchResponse.messageSet(topic, partitionId)) {
+ final ByteBuffer payload = messageAndOffset.message().payload();
+ //TODO use ByteBuffer maybe
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ logger.debug("get message offset:" + messageAndOffset.offset());
+ try {
+ streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
+ } catch (InterruptedException e) {
+ logger.error("error put streamQueue", e);
+ }
+ offset.incrementAndGet();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
new file mode 100644
index 0000000..3bdbd13
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class ConsumerConfig {
+
+ private int timeout;
+
+ private int maxReadCount;
+
+ private int bufferSize;
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getMaxReadCount() {
+ return maxReadCount;
+ }
+
+ public void setMaxReadCount(int maxReadCount) {
+ this.maxReadCount = maxReadCount;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
new file mode 100644
index 0000000..1f14a47
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -0,0 +1,121 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import kafka.api.FetchRequestBuilder;
+import kafka.cluster.Broker;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public final class Requester {
+
+ private static final Logger logger = LoggerFactory.getLogger(Requester.class);
+
+ public static TopicMeta getKafkaTopicMeta(TopicConfig topicConfig, ConsumerConfig consumerConfig) {
+ SimpleConsumer consumer;
+ for (Broker broker : topicConfig.getBrokers()) {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(topicConfig.getTopic());
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ TopicMetadataResponse resp = consumer.send(req);
+ final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+ if (topicMetadatas.size() != 1) {
+ break;
+ }
+ final TopicMetadata topicMetadata = topicMetadatas.get(0);
+ if (topicMetadata.errorCode() != 0) {
+ break;
+ }
+ List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
+ @Nullable
+ @Override
+ public Integer apply(PartitionMetadata partitionMetadata) {
+ return partitionMetadata.partitionId();
+ }
+ });
+ return new TopicMeta(topicConfig.getTopic(), partitionIds);
+ }
+ logger.debug("cannot find topic:" + topicConfig.getTopic());
+ return null;
+ }
+
+ public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, ConsumerConfig consumerConfig) {
+ SimpleConsumer consumer;
+ for (Broker broker : brokers) {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(topic);
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ TopicMetadataResponse resp = consumer.send(req);
+ final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+ if (topicMetadatas.size() != 1) {
+ logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
+ break;
+ }
+ final TopicMetadata topicMetadata = topicMetadatas.get(0);
+ if (topicMetadata.errorCode() != 0) {
+ logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
+ break;
+ }
+ for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
+ if (partitionMetadata.partitionId() == partitionId) {
+ return partitionMetadata;
+ }
+ }
+ }
+ logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
+ return null;
+ }
+
+ public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, ConsumerConfig consumerConfig) {
+ final String clientName = "client_" + topic + "_" + partitionId;
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+ kafka.api.FetchRequest req = new FetchRequestBuilder()
+ .clientId(clientName)
+ .addFetch(topic, partitionId, offset, consumerConfig.getTimeout()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+ .build();
+ return consumer.fetch(req);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
new file mode 100644
index 0000000..2a0ede4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class Stream {
+
+ private long timestamp;
+ private byte[] rawData;
+
+ public Stream(long timestamp, byte[] rawData) {
+ this.timestamp = timestamp;
+ this.rawData = rawData;
+ }
+
+ public byte[] getRawData() {
+ return rawData;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
new file mode 100644
index 0000000..4aa9671
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
@@ -0,0 +1,66 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.cluster.Broker;
+
+import java.util.List;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TopicConfig {
+
+ private String topic;
+
+ private List<Broker> brokers;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public List<Broker> getBrokers() {
+ return brokers;
+ }
+
+ public void setBrokers(List<Broker> brokers) {
+ this.brokers = brokers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
new file mode 100644
index 0000000..7822797
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
+ *
+ * Created by qianzhou on 2/15/15.
+ */
+public class TopicMeta {
+
+ private final String name;
+
+ private final List<Integer> partitionIds;
+
+ public TopicMeta(String name, List<Integer> partitionIds) {
+ this.name = name;
+ this.partitionIds = Collections.unmodifiableList(partitionIds);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<Integer> getPartitionIds() {
+ return partitionIds;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
new file mode 100644
index 0000000..a1f9b87
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
@@ -0,0 +1,70 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import org.I0Itec.zkclient.ZkClient;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public abstract class KafkaBaseTest {
+
+ protected static final Logger logger = LoggerFactory.getLogger("kafka test");
+
+ protected static ZkClient zkClient;
+
+ @BeforeClass
+ public static void beforeClass() {
+ zkClient = new ZkClient(TestConstants.ZOOKEEPER);
+ }
+
+
+ public static void createTopic(String topic, int partition, int replica) {
+ try {
+ AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
+ } catch (TopicExistsException e) {
+ logger.info(e.getMessage());
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
new file mode 100644
index 0000000..d89695d
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
@@ -0,0 +1,106 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class KafkaConsumerTest extends KafkaBaseTest {
+
+ private TestProducer producer;
+
+ private static final int TOTAL_SEND_COUNT = 100;
+
+ @Before
+ public void before() {
+ producer = new TestProducer(TOTAL_SEND_COUNT);
+ producer.start();
+ }
+
+ @After
+ public void after() {
+ producer.stop();
+ }
+
+ private void waitForProducerToStop(TestProducer producer) {
+ while (!producer.isStopped()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void test() throws InterruptedException {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopic(TestConstants.TOPIC);
+ topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
+ ConsumerConfig consumerConfig = new ConsumerConfig();
+ consumerConfig.setBufferSize(64 * 1024);
+ consumerConfig.setMaxReadCount(1000);
+ consumerConfig.setTimeout(60 * 1000);
+ final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+ final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+ List<BlockingQueue<Stream>> queues = Lists.newArrayList();
+ for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
+ Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, Lists.asList(TestConstants.BROKER, new Broker[0]), consumerConfig);
+ queues.add(consumer.getStreamQueue());
+ executorService.execute(consumer);
+ }
+ waitForProducerToStop(producer);
+ int count = 0;
+ for (BlockingQueue<Stream> queue : queues) {
+ count += queue.size();
+ }
+ //since there will be historical data
+ assertTrue(count >= TOTAL_SEND_COUNT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
new file mode 100644
index 0000000..fd8cc63
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
@@ -0,0 +1,84 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class RequesterTest extends KafkaBaseTest {
+
+ private static TopicConfig topicConfig;
+ private static ConsumerConfig consumerConfig;
+
+ private static final String UNEXISTED_TOPIC = "unexist_topic";
+
+ @BeforeClass
+ public static void beforeClass() {
+ topicConfig = new TopicConfig();
+ topicConfig.setTopic(TestConstants.TOPIC);
+ topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
+ consumerConfig = new ConsumerConfig();
+ consumerConfig.setBufferSize(64 * 1024);
+ consumerConfig.setMaxReadCount(1000);
+ consumerConfig.setTimeout(60 * 1000);
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ }
+
+ @Test
+ public void testTopicMeta() throws Exception {
+ TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+ assertNotNull(kafkaTopicMeta);
+ assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
+ assertEquals(topicConfig.getTopic(), kafkaTopicMeta.getName());
+
+ TopicConfig anotherTopicConfig = new TopicConfig();
+ anotherTopicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
+ anotherTopicConfig.setTopic(UNEXISTED_TOPIC);
+
+ kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig, consumerConfig);
+ assertTrue(kafkaTopicMeta == null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
new file mode 100644
index 0000000..85a6ba3
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
@@ -0,0 +1,48 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.cluster.Broker;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TestConstants {
+
+ public static final String TOPIC = "kafka_stream_test";
+ public static final String ZOOKEEPER = "sandbox.hortonworks.com:2181";
+ public static final Broker BROKER = new Broker(0, "sandbox.hortonworks.com", 6667);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
new file mode 100644
index 0000000..54ad583
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TestProducer {
+
+ private volatile boolean stopped = false;
+
+ private static final Logger logger = LoggerFactory.getLogger(TestConstants.class);
+
+ private final int sendCount;
+
+ public TestProducer(int sendCount) {
+ this.sendCount = sendCount;
+ }
+
+ public void start() {
+ Properties props = new Properties();
+ props.put("metadata.broker.list", TestConstants.BROKER.getConnectionString());
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("request.required.acks", "1");
+ ProducerConfig config = new ProducerConfig(props);
+ final Producer<String, String> producer = new Producer<String, String>(config);
+
+ final Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int count = 0;
+ while (!stopped && count < sendCount) {
+ final KeyedMessage<String, String> message = new KeyedMessage<>(TestConstants.TOPIC, "current time is:" + System.currentTimeMillis());
+ producer.send(message);
+ count++;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ logger.debug("totally " + count +" messages have been sent");
+ stopped = true;
+
+ }
+ });
+ thread.setDaemon(false);
+ thread.start();
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/437a31ce/streaming/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/streaming/src/test/resources/log4j.xml b/streaming/src/test/resources/log4j.xml
new file mode 100644
index 0000000..a0afa96
--- /dev/null
+++ b/streaming/src/test/resources/log4j.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ /*
+ ~
+ ~ * Licensed to the Apache Software Foundation (ASF) under one or more
+ ~
+ ~ * contributor license agreements. See the NOTICE file distributed with
+ ~
+ ~ * this work for additional information regarding copyright ownership.
+ ~
+ ~ * The ASF licenses this file to You under the Apache License, Version 2.0
+ ~
+ ~ * (the "License"); you may not use this file except in compliance with
+ ~
+ ~ * the License. You may obtain a copy of the License at
+ ~
+ ~ *
+ ~
+ ~ * http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ *
+ ~
+ ~ * Unless required by applicable law or agreed to in writing, software
+ ~
+ ~ * distributed under the License is distributed on an "AS IS" BASIS,
+ ~
+ ~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~
+ ~ * See the License for the specific language governing permissions and
+ ~
+ ~ * limitations under the License.
+ ~
+ ~ */
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration debug="true"
+ xmlns:log4j='http://jakarta.apache.org/log4j/'>
+
+ <appender name="consoleAppender" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{dd MMM yyyy HH:mm:ss} %5p %c{1} - %m%n"/>
+ </layout>
+ </appender>
+
+ <root>
+ <level value="INFO"/>
+ <appender-ref ref="consoleAppender"/>
+ </root>
+
+</log4j:configuration>
\ No newline at end of file