You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:08:15 UTC
[19/26] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/58a6f73f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/58a6f73f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/58a6f73f
Branch: refs/heads/streaming
Commit: 58a6f73f530a5008ee565c863b0473fef5d34200
Parents: c038cb4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 18:10:48 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 18:10:48 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/streaming/kafka/Consumer.java | 14 +-
.../kylin/streaming/kafka/ConsumerConfig.java | 72 ---------
.../kylin/streaming/kafka/KafkaConfig.java | 151 +++++++++++++++++++
.../apache/kylin/streaming/kafka/Requester.java | 26 ++--
.../kylin/streaming/kafka/TopicConfig.java | 66 --------
.../kylin/streaming/kafka/KafkaBaseTest.java | 11 +-
.../kylin/streaming/kafka/KafkaConfigTest.java | 64 ++++++++
.../streaming/kafka/KafkaConsumerTest.java | 15 +-
.../kylin/streaming/kafka/RequesterTest.java | 26 +---
.../kylin/streaming/kafka/TestConstants.java | 48 ------
.../kylin/streaming/kafka/TestProducer.java | 25 ++-
.../kafka_streaming_test/kafka.properties | 10 ++
12 files changed, 286 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
index c825d4b..074ef01 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -56,20 +56,20 @@ public class Consumer implements Runnable {
private String topic;
private int partitionId;
- private ConsumerConfig consumerConfig;
+ private KafkaConfig kafkaConfig;
private List<Broker> replicaBrokers;
private AtomicLong offset = new AtomicLong();
private BlockingQueue<Stream> streamQueue;
private Logger logger;
- public Consumer(String topic, int partitionId, List<Broker> initialBrokers, ConsumerConfig consumerConfig) {
+ public Consumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
this.topic = topic;
this.partitionId = partitionId;
- this.consumerConfig = consumerConfig;
+ this.kafkaConfig = kafkaConfig;
this.replicaBrokers = initialBrokers;
logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
- streamQueue = new ArrayBlockingQueue<Stream>(consumerConfig.getMaxReadCount());
+ streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
}
public BlockingQueue<Stream> getStreamQueue() {
@@ -77,7 +77,7 @@ public class Consumer implements Runnable {
}
private Broker getLeadBroker() {
- final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, consumerConfig);
+ final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
replicaBrokers = partitionMetadata.replicas();
return partitionMetadata.leader();
@@ -94,9 +94,9 @@ public class Consumer implements Runnable {
logger.warn("cannot find lead broker");
continue;
}
- final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, consumerConfig);
+ final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
offset.set(lastOffset);
- final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
+ final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
if (fetchResponse.errorCode(topic, partitionId) != 0) {
logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
continue;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
deleted file mode 100644
index 3bdbd13..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/ConsumerConfig.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class ConsumerConfig {
-
- private int timeout;
-
- private int maxReadCount;
-
- private int bufferSize;
-
- public int getTimeout() {
- return timeout;
- }
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public int getMaxReadCount() {
- return maxReadCount;
- }
-
- public void setMaxReadCount(int maxReadCount) {
- this.maxReadCount = maxReadCount;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-
- public void setBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
new file mode 100644
index 0000000..82513eb
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
@@ -0,0 +1,151 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfig {
+
+ private List<Broker> brokers;
+
+ private String zookeeper;
+
+ private String topic;
+
+ private int timeout;
+
+ private int maxReadCount;
+
+ private int bufferSize;
+
+ private int partitionId;
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getMaxReadCount() {
+ return maxReadCount;
+ }
+
+ public void setMaxReadCount(int maxReadCount) {
+ this.maxReadCount = maxReadCount;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public List<Broker> getBrokers() {
+ return brokers;
+ }
+
+ public void setBrokers(List<Broker> brokers) {
+ this.brokers = brokers;
+ }
+
+ public String getZookeeper() {
+ return zookeeper;
+ }
+
+ public void setZookeeper(String zookeeper) {
+ this.zookeeper = zookeeper;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public static KafkaConfig load(KafkaConfig config) {
+ KafkaConfig result = new KafkaConfig();
+ result.setBufferSize(config.getBufferSize());
+ result.setMaxReadCount(config.getMaxReadCount());
+ result.setTimeout(config.getTimeout());
+ result.setTopic(config.getTopic());
+ result.setZookeeper(config.getZookeeper());
+ result.setPartitionId(config.getPartitionId());
+ result.setBrokers(config.getBrokers());
+ return result;
+ }
+
+ public static KafkaConfig load(Properties properties) {
+ Preconditions.checkNotNull(properties);
+ KafkaConfig result = new KafkaConfig();
+ result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
+ result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
+ result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
+ result.setTopic(properties.getProperty("topic"));
+ result.setZookeeper(properties.getProperty("zookeeper"));
+ result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
+
+ int id = 0;
+ List<Broker> brokers = Lists.newArrayList();
+ for (String str: properties.getProperty("brokers").split(",")) {
+ final String[] split = str.split(":");
+ final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
+ brokers.add(broker);
+ }
+ result.setBrokers(brokers);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
index f4cdd8e..8811695 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -58,11 +58,11 @@ public final class Requester {
private static final Logger logger = LoggerFactory.getLogger(Requester.class);
- public static TopicMeta getKafkaTopicMeta(TopicConfig topicConfig, ConsumerConfig consumerConfig) {
+ public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
SimpleConsumer consumer;
- for (Broker broker : topicConfig.getBrokers()) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(topicConfig.getTopic());
+ for (Broker broker : kafkaConfig.getBrokers()) {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = consumer.send(req);
final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
@@ -80,16 +80,16 @@ public final class Requester {
return partitionMetadata.partitionId();
}
});
- return new TopicMeta(topicConfig.getTopic(), partitionIds);
+ return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
}
- logger.debug("cannot find topic:" + topicConfig.getTopic());
+ logger.debug("cannot find topic:" + kafkaConfig.getTopic());
return null;
}
- public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, ConsumerConfig consumerConfig) {
+ public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
SimpleConsumer consumer;
for (Broker broker : brokers) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), "topic_meta_lookup");
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = consumer.send(req);
@@ -113,20 +113,20 @@ public final class Requester {
return null;
}
- public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, ConsumerConfig consumerConfig) {
+ public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
final String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
kafka.api.FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
- .addFetch(topic, partitionId, offset, consumerConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+ .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
.build();
return consumer.fetch(req);
}
public static long getLastOffset(String topic, int partitionId,
- long whichTime, Broker broker, ConsumerConfig consumerConfig) {
+ long whichTime, Broker broker, KafkaConfig kafkaConfig) {
String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
deleted file mode 100644
index 4aa9671..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicConfig.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.cluster.Broker;
-
-import java.util.List;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TopicConfig {
-
- private String topic;
-
- private List<Broker> brokers;
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public List<Broker> getBrokers() {
- return brokers;
- }
-
- public void setBrokers(List<Broker> brokers) {
- this.brokers = brokers;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
index a1f9b87..537a15d 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
@@ -41,6 +41,7 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Properties;
/**
@@ -52,9 +53,15 @@ public abstract class KafkaBaseTest {
protected static ZkClient zkClient;
+ protected static KafkaConfig kafkaConfig;
+
@BeforeClass
- public static void beforeClass() {
- zkClient = new ZkClient(TestConstants.ZOOKEEPER);
+ public static void beforeClass() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ kafkaConfig = KafkaConfig.load(properties);
+
+ zkClient = new ZkClient(kafkaConfig.getZookeeper());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
new file mode 100644
index 0000000..3c9bd87
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfigTest {
+
+ @Test
+ public void test() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ KafkaConfig config = KafkaConfig.load(properties);
+ assertEquals(1000, config.getMaxReadCount());
+ assertEquals(65536, config.getBufferSize());
+ assertEquals(60000, config.getTimeout());
+ assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
+ assertEquals("kafka_stream_test", config.getTopic());
+ assertEquals(0, config.getPartitionId());
+ assertEquals(1, config.getBrokers().size());
+ assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
index d89695d..4449d37 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
@@ -36,10 +36,12 @@ package org.apache.kylin.streaming.kafka;
import com.google.common.collect.Lists;
import kafka.cluster.Broker;
+import kafka.consumer.ConsumerConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -58,7 +60,7 @@ public class KafkaConsumerTest extends KafkaBaseTest {
private static final int TOTAL_SEND_COUNT = 100;
@Before
- public void before() {
+ public void before() throws IOException {
producer = new TestProducer(TOTAL_SEND_COUNT);
producer.start();
}
@@ -80,18 +82,11 @@ public class KafkaConsumerTest extends KafkaBaseTest {
@Test
public void test() throws InterruptedException {
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setTopic(TestConstants.TOPIC);
- topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
- ConsumerConfig consumerConfig = new ConsumerConfig();
- consumerConfig.setBufferSize(64 * 1024);
- consumerConfig.setMaxReadCount(1000);
- consumerConfig.setTimeout(60 * 1000);
- final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+ final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
List<BlockingQueue<Stream>> queues = Lists.newArrayList();
for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
- Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, Lists.asList(TestConstants.BROKER, new Broker[0]), consumerConfig);
+ Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
queues.add(consumer.getStreamQueue());
executorService.execute(consumer);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
index fd8cc63..694c1fd 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
@@ -35,7 +35,6 @@
package org.apache.kylin.streaming.kafka;
import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Collections;
@@ -47,21 +46,9 @@ import static org.junit.Assert.*;
*/
public class RequesterTest extends KafkaBaseTest {
- private static TopicConfig topicConfig;
- private static ConsumerConfig consumerConfig;
+ private static final String NON_EXISTED_TOPIC = "non_existent_topic";
- private static final String UNEXISTED_TOPIC = "unexist_topic";
- @BeforeClass
- public static void beforeClass() {
- topicConfig = new TopicConfig();
- topicConfig.setTopic(TestConstants.TOPIC);
- topicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
- consumerConfig = new ConsumerConfig();
- consumerConfig.setBufferSize(64 * 1024);
- consumerConfig.setMaxReadCount(1000);
- consumerConfig.setTimeout(60 * 1000);
- }
@AfterClass
public static void afterClass() {
@@ -69,16 +56,15 @@ public class RequesterTest extends KafkaBaseTest {
@Test
public void testTopicMeta() throws Exception {
- TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(topicConfig, consumerConfig);
+ TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
assertNotNull(kafkaTopicMeta);
assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
- assertEquals(topicConfig.getTopic(), kafkaTopicMeta.getName());
+ assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
- TopicConfig anotherTopicConfig = new TopicConfig();
- anotherTopicConfig.setBrokers(Collections.singletonList(TestConstants.BROKER));
- anotherTopicConfig.setTopic(UNEXISTED_TOPIC);
+ KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
+ anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
- kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig, consumerConfig);
+ kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
assertTrue(kafkaTopicMeta == null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
deleted file mode 100644
index 85a6ba3..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestConstants.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.cluster.Broker;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TestConstants {
-
- public static final String TOPIC = "kafka_stream_test";
- public static final String ZOOKEEPER = "sandbox.hortonworks.com:2181";
- public static final Broker BROKER = new Broker(0, "sandbox.hortonworks.com", 6667);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
index 54ad583..cd3b166 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
@@ -34,12 +34,19 @@
package org.apache.kylin.streaming.kafka;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Properties;
/**
@@ -49,7 +56,7 @@ public class TestProducer {
private volatile boolean stopped = false;
- private static final Logger logger = LoggerFactory.getLogger(TestConstants.class);
+ private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
private final int sendCount;
@@ -57,9 +64,19 @@ public class TestProducer {
this.sendCount = sendCount;
}
- public void start() {
+ public void start() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
+
Properties props = new Properties();
- props.put("metadata.broker.list", TestConstants.BROKER.getConnectionString());
+ props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable Broker broker) {
+ return broker.getConnectionString();
+ }
+ }), ","));
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
@@ -70,7 +87,7 @@ public class TestProducer {
public void run() {
int count = 0;
while (!stopped && count < sendCount) {
- final KeyedMessage<String, String> message = new KeyedMessage<>(TestConstants.TOPIC, "current time is:" + System.currentTimeMillis());
+ final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
producer.send(message);
count++;
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/58a6f73f/streaming/src/test/resources/kafka_streaming_test/kafka.properties
----------------------------------------------------------------------
diff --git a/streaming/src/test/resources/kafka_streaming_test/kafka.properties b/streaming/src/test/resources/kafka_streaming_test/kafka.properties
new file mode 100644
index 0000000..ae762e3
--- /dev/null
+++ b/streaming/src/test/resources/kafka_streaming_test/kafka.properties
@@ -0,0 +1,10 @@
+zookeeper=sandbox.hortonworks.com:2181
+
+brokers=sandbox.hortonworks.com:6667
+
+consumer.timeout=60000
+consumer.bufferSize=65536
+consumer.maxReadCount=1000
+
+topic=kafka_stream_test
+partitionId=0