You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:08:15 UTC

[19/26] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/58a6f73f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/58a6f73f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/58a6f73f

Branch: refs/heads/streaming
Commit: 58a6f73f530a5008ee565c863b0473fef5d34200
Parents: c038cb4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 18:10:48 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 18:10:48 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/streaming/kafka/Consumer.java  |  14 +-
 .../kylin/streaming/kafka/ConsumerConfig.java   |  72 ---------
 .../kylin/streaming/kafka/KafkaConfig.java      | 151 +++++++++++++++++++
 .../apache/kylin/streaming/kafka/Requester.java |  26 ++--
 .../kylin/streaming/kafka/TopicConfig.java      |  66 --------
 .../kylin/streaming/kafka/KafkaBaseTest.java    |  11 +-
 .../kylin/streaming/kafka/KafkaConfigTest.java  |  64 ++++++++
 .../streaming/kafka/KafkaConsumerTest.java      |  15 +-
 .../kylin/streaming/kafka/RequesterTest.java    |  26 +---
 .../kylin/streaming/kafka/TestConstants.java    |  48 ------
 .../kylin/streaming/kafka/TestProducer.java     |  25 ++-
 .../kafka_streaming_test/kafka.properties       |  10 ++
 12 files changed, 286 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
index c825d4b..074ef01 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -56,20 +56,20 @@ public class Consumer implements Runnable {
     private String topic;
     private int partitionId;
 
-    private ConsumerConfig consumerConfig;
+    private KafkaConfig kafkaConfig;
     private List<Broker> replicaBrokers;
     private AtomicLong offset = new AtomicLong();
     private BlockingQueue<Stream> streamQueue;
 
     private Logger logger;
 
-    public Consumer(String topic, int partitionId, List<Broker> initialBrokers, ConsumerConfig consumerConfig) {
+    public Consumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
         this.topic = topic;
         this.partitionId = partitionId;
-        this.consumerConfig = consumerConfig;
+        this.kafkaConfig = kafkaConfig;
         this.replicaBrokers = initialBrokers;
         logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
-        streamQueue = new ArrayBlockingQueue<Stream>(consumerConfig.getMaxReadCount());
+        streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
     }
 
     public BlockingQueue<Stream> getStreamQueue() {
@@ -77,7 +77,7 @@ public class Consumer implements Runnable {
     }
 
     private Broker getLeadBroker() {
-        final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, consumerConfig);
+        final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
         if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
             replicaBrokers = partitionMetadata.replicas();
             return partitionMetadata.leader();
@@ -94,9 +94,9 @@ public class Consumer implements Runnable {
                 logger.warn("cannot find lead broker");
                 continue;
             }
-            final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, consumerConfig);
+            final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
             offset.set(lastOffset);
-            final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
+            final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
             if (fetchResponse.errorCode(topic, partitionId) != 0) {
                 logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
                 continue;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
deleted file mode 100644
index 3bdbd13..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class ConsumerConfig {
-
-    private int timeout;
-
-    private int maxReadCount;
-
-    private int bufferSize;
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
-    }
-
-    public int getMaxReadCount() {
-        return maxReadCount;
-    }
-
-    public void setMaxReadCount(int maxReadCount) {
-        this.maxReadCount = maxReadCount;
-    }
-
-    public int getBufferSize() {
-        return bufferSize;
-    }
-
-    public void setBufferSize(int bufferSize) {
-        this.bufferSize = bufferSize;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
new file mode 100644
index 0000000..82513eb
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
@@ -0,0 +1,151 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfig {
+
+    private List<Broker> brokers;
+
+    private String zookeeper;
+
+    private String topic;
+
+    private int timeout;
+
+    private int maxReadCount;
+
+    private int bufferSize;
+
+    private int partitionId;
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getMaxReadCount() {
+        return maxReadCount;
+    }
+
+    public void setMaxReadCount(int maxReadCount) {
+        this.maxReadCount = maxReadCount;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public List<Broker> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(List<Broker> brokers) {
+        this.brokers = brokers;
+    }
+
+    public String getZookeeper() {
+        return zookeeper;
+    }
+
+    public void setZookeeper(String zookeeper) {
+        this.zookeeper = zookeeper;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public void setPartitionId(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public static KafkaConfig load(KafkaConfig config) {
+        KafkaConfig result = new KafkaConfig();
+        result.setBufferSize(config.getBufferSize());
+        result.setMaxReadCount(config.getMaxReadCount());
+        result.setTimeout(config.getTimeout());
+        result.setTopic(config.getTopic());
+        result.setZookeeper(config.getZookeeper());
+        result.setPartitionId(config.getPartitionId());
+        result.setBrokers(config.getBrokers());
+        return result;
+    }
+
+    public static KafkaConfig load(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        KafkaConfig result = new KafkaConfig();
+        result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
+        result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
+        result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
+        result.setTopic(properties.getProperty("topic"));
+        result.setZookeeper(properties.getProperty("zookeeper"));
+        result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
+
+        int id = 0;
+        List<Broker> brokers = Lists.newArrayList();
+        for (String str: properties.getProperty("brokers").split(",")) {
+            final String[] split = str.split(":");
+            final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
+            brokers.add(broker);
+        }
+        result.setBrokers(brokers);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
index f4cdd8e..8811695 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -58,11 +58,11 @@ public final class Requester {
 
     private static final Logger logger = LoggerFactory.getLogger(Requester.class);
 
-    public static TopicMeta getKafkaTopicMeta(TopicConfig topicConfig, ConsumerConfig consumerConfig) {
+    public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
         SimpleConsumer consumer;
-        for (Broker broker : topicConfig.getBrokers()) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(topicConfig.getTopic());
+        for (Broker broker : kafkaConfig.getBrokers()) {
+            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+            List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
             TopicMetadataRequest req = new TopicMetadataRequest(topics);
             TopicMetadataResponse resp = consumer.send(req);
             final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
@@ -80,16 +80,16 @@ public final class Requester {
                     return partitionMetadata.partitionId();
                 }
             });
-            return new TopicMeta(topicConfig.getTopic(), partitionIds);
+            return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
         }
-        logger.debug("cannot find topic:" + topicConfig.getTopic());
+        logger.debug("cannot find topic:" + kafkaConfig.getTopic());
         return null;
     }
 
-    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, ConsumerConfig consumerConfig) {
+    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
         SimpleConsumer consumer;
         for (Broker broker : brokers) {
-            consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
+            consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
             List<String> topics = Collections.singletonList(topic);
             TopicMetadataRequest req = new TopicMetadataRequest(topics);
             TopicMetadataResponse resp = consumer.send(req);
@@ -113,20 +113,20 @@ public final class Requester {
         return null;
     }
 
-    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, ConsumerConfig consumerConfig) {
+    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
         final String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
         kafka.api.FetchRequest req = new FetchRequestBuilder()
                 .clientId(clientName)
-                .addFetch(topic, partitionId, offset, consumerConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+                .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                 .build();
         return consumer.fetch(req);
     }
 
     public static long getLastOffset(String topic, int partitionId,
-                                     long whichTime, Broker broker, ConsumerConfig consumerConfig) {
+                                     long whichTime, Broker broker, KafkaConfig kafkaConfig) {
         String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
deleted file mode 100644
index 4aa9671..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.cluster.Broker;
-
-import java.util.List;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TopicConfig {
-
-    private String topic;
-
-    private List<Broker> brokers;
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public List<Broker> getBrokers() {
-        return brokers;
-    }
-
-    public void setBrokers(List<Broker> brokers) {
-        this.brokers = brokers;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
index a1f9b87..537a15d 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
@@ -41,6 +41,7 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Properties;
 
 /**
@@ -52,9 +53,15 @@ public abstract class KafkaBaseTest {
 
     protected static ZkClient zkClient;
 
+    protected static KafkaConfig kafkaConfig;
+
     @BeforeClass
-    public static void beforeClass() {
-        zkClient = new ZkClient(TestConstants.ZOOKEEPER);
+    public static void beforeClass() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        kafkaConfig = KafkaConfig.load(properties);
+
+        zkClient = new ZkClient(kafkaConfig.getZookeeper());
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
new file mode 100644
index 0000000..3c9bd87
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfigTest {
+
+    @Test
+    public void test() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        KafkaConfig config = KafkaConfig.load(properties);
+        assertEquals(1000, config.getMaxReadCount());
+        assertEquals(65536, config.getBufferSize());
+        assertEquals(60000, config.getTimeout());
+        assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
+        assertEquals("kafka_stream_test", config.getTopic());
+        assertEquals(0, config.getPartitionId());
+        assertEquals(1, config.getBrokers().size());
+        assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
index d89695d..4449d37 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
@@ -36,10 +36,12 @@ package org.apache.kylin.streaming.kafka;
 
 import com.google.common.collect.Lists;
 import kafka.cluster.Broker;
+import kafka.consumer.ConsumerConfig;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -58,7 +60,7 @@ public class KafkaConsumerTest extends KafkaBaseTest {
     private static final int TOTAL_SEND_COUNT = 100;
 
     @Before
-    public void before() {
+    public void before() throws IOException {
         producer = new TestProducer(TOTAL_SEND_COUNT);
         producer.start();
     }
@@ -80,18 +82,11 @@ public class KafkaConsumerTest extends KafkaBaseTest {
 
     @Test
     public void test() throws InterruptedException {
-        TopicConfig topicConfig = new TopicConfig();
-        topicConfig.setTopic(TestConstants.TOPIC);
-        topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
-        ConsumerConfig consumerConfig = new ConsumerConfig();
-        consumerConfig.setBufferSize(64 * 1024);
-        consumerConfig.setMaxReadCount(1000);
-        consumerConfig.setTimeout(60 * 1000);
-        final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+        final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
         final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
         List<BlockingQueue<Stream>> queues = Lists.newArrayList();
         for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
-            Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, Lists.asList(TestConstants.BROKER, new Broker[0]), consumerConfig);
+            Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
             queues.add(consumer.getStreamQueue());
             executorService.execute(consumer);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
index fd8cc63..694c1fd 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
@@ -35,7 +35,6 @@
 package org.apache.kylin.streaming.kafka;
 
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -47,21 +46,9 @@ import static org.junit.Assert.*;
  */
 public class RequesterTest extends KafkaBaseTest {
 
-    private static TopicConfig topicConfig;
-    private static ConsumerConfig consumerConfig;
+    private static final String NON_EXISTED_TOPIC = "non_existent_topic";
 
-    private static final String UNEXISTED_TOPIC = "unexist_topic";
 
-    @BeforeClass
-    public static void beforeClass() {
-        topicConfig = new TopicConfig();
-        topicConfig.setTopic(TestConstants.TOPIC);
-        topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
-        consumerConfig = new ConsumerConfig();
-        consumerConfig.setBufferSize(64 * 1024);
-        consumerConfig.setMaxReadCount(1000);
-        consumerConfig.setTimeout(60 * 1000);
-    }
 
     @AfterClass
     public static void afterClass() {
@@ -69,16 +56,15 @@ public class RequesterTest extends KafkaBaseTest {
 
     @Test
     public void testTopicMeta() throws Exception {
-        TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+        TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
         assertNotNull(kafkaTopicMeta);
         assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
-        assertEquals(topicConfig.getTopic(), kafkaTopicMeta.getName());
+        assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
 
-        TopicConfig anotherTopicConfig = new TopicConfig();
-        anotherTopicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
-        anotherTopicConfig.setTopic(UNEXISTED_TOPIC);
+        KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
+        anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
 
-        kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig, consumerConfig);
+        kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
         assertTrue(kafkaTopicMeta == null);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
deleted file mode 100644
index 85a6ba3..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.cluster.Broker;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TestConstants {
-
-    public static final String TOPIC = "kafka_stream_test";
-    public static final String ZOOKEEPER = "sandbox.hortonworks.com:2181";
-    public static final Broker BROKER = new Broker(0, "sandbox.hortonworks.com", 6667);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
index 54ad583..cd3b166 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
@@ -34,12 +34,19 @@
 
 package org.apache.kylin.streaming.kafka;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.Properties;
 
 /**
@@ -49,7 +56,7 @@ public class TestProducer {
 
     private volatile boolean stopped = false;
 
-    private static final Logger logger = LoggerFactory.getLogger(TestConstants.class);
+    private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
 
     private final int sendCount;
 
@@ -57,9 +64,19 @@ public class TestProducer {
         this.sendCount = sendCount;
     }
 
-    public void start() {
+    public void start() throws IOException {
+        final Properties properties = new Properties();
+        properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+        final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
+
         Properties props = new Properties();
-        props.put("metadata.broker.list", TestConstants.BROKER.getConnectionString());
+        props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+            @Nullable
+            @Override
+            public String apply(@Nullable Broker broker) {
+                return broker.getConnectionString();
+            }
+        }), ","));
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         props.put("request.required.acks", "1");
         ProducerConfig config = new ProducerConfig(props);
@@ -70,7 +87,7 @@ public class TestProducer {
             public void run() {
                 int count = 0;
                 while (!stopped && count < sendCount) {
-                    final KeyedMessage<String, String> message = new KeyedMessage<>(TestConstants.TOPIC, "current time is:" + System.currentTimeMillis());
+                    final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
                     producer.send(message);
                     count++;
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/resources/kafka_streaming_test/kafka.properties
----------------------------------------------------------------------
diff --git a/streaming/src/test/resources/kafka_streaming_test/kafka.properties b/streaming/src/test/resources/kafka_streaming_test/kafka.properties
new file mode 100644
index 0000000..ae762e3
--- /dev/null
+++ b/streaming/src/test/resources/kafka_streaming_test/kafka.properties
@@ -0,0 +1,10 @@
+zookeeper=sandbox.hortonworks.com:2181
+
+brokers=sandbox.hortonworks.com:6667
+
+consumer.timeout=60000
+consumer.bufferSize=65536
+consumer.maxReadCount=1000
+
+topic=kafka_stream_test
+partitionId=0