You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:08:21 UTC
[25/26] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0970d76f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0970d76f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0970d76f
Branch: refs/heads/streaming
Commit: 0970d76fd4e9d6b92e684cc64c5e97676f986132
Parents: 1f2fb28
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 3 16:06:58 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 3 16:06:58 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/streaming/KafkaConfig.java | 151 +++++++++++++++++++
.../apache/kylin/streaming/KafkaConsumer.java | 133 ++++++++++++++++
.../org/apache/kylin/streaming/Requester.java | 146 ++++++++++++++++++
.../java/org/apache/kylin/streaming/Stream.java | 57 +++++++
.../apache/kylin/streaming/StreamBuilder.java | 95 ++++++++++++
.../org/apache/kylin/streaming/TopicMeta.java | 63 ++++++++
.../apache/kylin/streaming/kafka/Consumer.java | 120 ---------------
.../kylin/streaming/kafka/KafkaConfig.java | 151 -------------------
.../apache/kylin/streaming/kafka/Requester.java | 146 ------------------
.../apache/kylin/streaming/kafka/Stream.java | 57 -------
.../kylin/streaming/kafka/StreamBuilder.java | 94 ------------
.../apache/kylin/streaming/kafka/TopicMeta.java | 63 --------
.../apache/kylin/streaming/KafkaBaseTest.java | 78 ++++++++++
.../apache/kylin/streaming/KafkaConfigTest.java | 65 ++++++++
.../kylin/streaming/KafkaConsumerTest.java | 98 ++++++++++++
.../apache/kylin/streaming/RequesterTest.java | 71 +++++++++
.../apache/kylin/streaming/TestProducer.java | 114 ++++++++++++++
.../kylin/streaming/kafka/KafkaBaseTest.java | 77 ----------
.../kylin/streaming/kafka/KafkaConfigTest.java | 64 --------
.../streaming/kafka/KafkaConsumerTest.java | 101 -------------
.../kylin/streaming/kafka/RequesterTest.java | 70 ---------
.../kylin/streaming/kafka/TestProducer.java | 115 --------------
22 files changed, 1071 insertions(+), 1058 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
new file mode 100644
index 0000000..7d0cd6b
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -0,0 +1,151 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import kafka.cluster.Broker;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfig {
+
+ private List<Broker> brokers;
+
+ private String zookeeper;
+
+ private String topic;
+
+ private int timeout;
+
+ private int maxReadCount;
+
+ private int bufferSize;
+
+ private int partitionId;
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getMaxReadCount() {
+ return maxReadCount;
+ }
+
+ public void setMaxReadCount(int maxReadCount) {
+ this.maxReadCount = maxReadCount;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public List<Broker> getBrokers() {
+ return brokers;
+ }
+
+ public void setBrokers(List<Broker> brokers) {
+ this.brokers = brokers;
+ }
+
+ public String getZookeeper() {
+ return zookeeper;
+ }
+
+ public void setZookeeper(String zookeeper) {
+ this.zookeeper = zookeeper;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public static KafkaConfig load(KafkaConfig config) {
+ KafkaConfig result = new KafkaConfig();
+ result.setBufferSize(config.getBufferSize());
+ result.setMaxReadCount(config.getMaxReadCount());
+ result.setTimeout(config.getTimeout());
+ result.setTopic(config.getTopic());
+ result.setZookeeper(config.getZookeeper());
+ result.setPartitionId(config.getPartitionId());
+ result.setBrokers(config.getBrokers());
+ return result;
+ }
+
+ public static KafkaConfig load(Properties properties) {
+ Preconditions.checkNotNull(properties);
+ KafkaConfig result = new KafkaConfig();
+ result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
+ result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
+ result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
+ result.setTopic(properties.getProperty("topic"));
+ result.setZookeeper(properties.getProperty("zookeeper"));
+ result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
+
+ int id = 0;
+ List<Broker> brokers = Lists.newArrayList();
+ for (String str: properties.getProperty("brokers").split(",")) {
+ final String[] split = str.split(":");
+ final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
+ brokers.add(broker);
+ }
+ result.setBrokers(brokers);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
new file mode 100644
index 0000000..e45b6e4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -0,0 +1,133 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class KafkaConsumer implements Runnable {
+
+ private String topic;
+ private int partitionId;
+
+ private KafkaConfig kafkaConfig;
+ private List<Broker> replicaBrokers;
+ private AtomicLong offset = new AtomicLong();
+ private BlockingQueue<Stream> streamQueue;
+
+ private Logger logger;
+
+ public KafkaConsumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
+ this.topic = topic;
+ this.partitionId = partitionId;
+ this.kafkaConfig = kafkaConfig;
+ this.replicaBrokers = initialBrokers;
+ logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
+ streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
+ }
+
+ public BlockingQueue<Stream> getStreamQueue() {
+ return streamQueue;
+ }
+
+ private Broker getLeadBroker() {
+ final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
+ if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ replicaBrokers = partitionMetadata.replicas();
+ return partitionMetadata.leader();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ Broker leadBroker = getLeadBroker();
+ if (leadBroker == null) {
+ logger.warn("cannot find lead broker");
+ } else {
+ final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+ offset.set(lastOffset);
+ }
+ while (true) {
+ if (leadBroker == null) {
+ leadBroker = getLeadBroker();
+ }
+ if (leadBroker == null) {
+ logger.warn("cannot find lead broker");
+ continue;
+ }
+
+ final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
+ if (fetchResponse.errorCode(topic, partitionId) != 0) {
+ logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
+ continue;
+ }
+ for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
+ final ByteBuffer payload = messageAndOffset.message().payload();
+ //TODO use ByteBuffer maybe
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ logger.debug("get message offset:" + messageAndOffset.offset());
+ try {
+ streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
+ } catch (InterruptedException e) {
+ logger.error("error put streamQueue", e);
+ break;
+ }
+ offset.incrementAndGet();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("consumer has encountered an error", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
new file mode 100644
index 0000000..79332af
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Requester.java
@@ -0,0 +1,146 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public final class Requester {
+
+ private static final Logger logger = LoggerFactory.getLogger(Requester.class);
+
+ public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
+ SimpleConsumer consumer;
+ for (Broker broker : kafkaConfig.getBrokers()) {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ TopicMetadataResponse resp = consumer.send(req);
+ final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+ if (topicMetadatas.size() != 1) {
+ break;
+ }
+ final TopicMetadata topicMetadata = topicMetadatas.get(0);
+ if (topicMetadata.errorCode() != 0) {
+ break;
+ }
+ List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
+ @Nullable
+ @Override
+ public Integer apply(PartitionMetadata partitionMetadata) {
+ return partitionMetadata.partitionId();
+ }
+ });
+ return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
+ }
+ logger.debug("cannot find topic:" + kafkaConfig.getTopic());
+ return null;
+ }
+
+ public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
+ SimpleConsumer consumer;
+ for (Broker broker : brokers) {
+ consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
+ List<String> topics = Collections.singletonList(topic);
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ TopicMetadataResponse resp = consumer.send(req);
+ final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
+ if (topicMetadatas.size() != 1) {
+ logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
+ break;
+ }
+ final TopicMetadata topicMetadata = topicMetadatas.get(0);
+ if (topicMetadata.errorCode() != 0) {
+ logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
+ break;
+ }
+ for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
+ if (partitionMetadata.partitionId() == partitionId) {
+ return partitionMetadata;
+ }
+ }
+ }
+ logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
+ return null;
+ }
+
+ public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
+ final String clientName = "client_" + topic + "_" + partitionId;
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+ kafka.api.FetchRequest req = new FetchRequestBuilder()
+ .clientId(clientName)
+ .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+ .build();
+ return consumer.fetch(req);
+ }
+
+ public static long getLastOffset(String topic, int partitionId,
+ long whichTime, Broker broker, KafkaConfig kafkaConfig) {
+ String clientName = "client_" + topic + "_" + partitionId;
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+ return 0;
+ }
+ long[] offsets = response.offsets(topic, partitionId);
+ return offsets[0];
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
new file mode 100644
index 0000000..3cb43b6
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+/**
+ * Created by qianzhou on 2/15/15.
+ */
+public class Stream {
+
+ private long timestamp;
+ private byte[] rawData;
+
+ public Stream(long timestamp, byte[] rawData) {
+ this.timestamp = timestamp;
+ this.rawData = rawData;
+ }
+
+ public byte[] getRawData() {
+ return rawData;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
new file mode 100644
index 0000000..005c255
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -0,0 +1,95 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by qianzhou on 2/17/15.
+ */
+public abstract class StreamBuilder implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+
+ private static final int BATCH_BUILD_BYTES_THRESHOLD = 64 * 1024;
+ private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 5 * 60 * 1000;
+
+ private BlockingQueue<Stream> streamQueue;
+ private long lastBuildTime = System.currentTimeMillis();
+ private int bytesTotal = 0;
+
+ public StreamBuilder(BlockingQueue<Stream> streamQueue) {
+ this.streamQueue = streamQueue;
+ }
+
+ protected abstract void build(List<Stream> streamsToBuild);
+
+ private void buildStream(List<Stream> streams) {
+ build(streams);
+ clearCounter();
+ }
+
+ private void clearCounter() {
+ lastBuildTime = System.currentTimeMillis();
+ bytesTotal = 0;
+ }
+
+ @Override
+ public void run() {
+ try {
+ List<Stream> streamToBuild = Lists.newArrayList();
+ clearCounter();
+ while (true) {
+ final Stream stream = streamQueue.take();
+ streamToBuild.add(stream);
+ bytesTotal += stream.getRawData().length;
+ if (bytesTotal >= BATCH_BUILD_BYTES_THRESHOLD) {
+ buildStream(streamToBuild);
+ } else if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
+ buildStream(streamToBuild);
+ } else {
+ continue;
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.error("StreamBuilder has been interrupted", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
new file mode 100644
index 0000000..b93d589
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TopicMeta.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
+ *
+ * Created by qianzhou on 2/15/15.
+ */
+public class TopicMeta {
+
+ private final String name;
+
+ private final List<Integer> partitionIds;
+
+ public TopicMeta(String name, List<Integer> partitionIds) {
+ this.name = name;
+ this.partitionIds = Collections.unmodifiableList(partitionIds);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<Integer> getPartitionIds() {
+ return partitionIds;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
deleted file mode 100644
index 074ef01..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class Consumer implements Runnable {
-
- private String topic;
- private int partitionId;
-
- private KafkaConfig kafkaConfig;
- private List<Broker> replicaBrokers;
- private AtomicLong offset = new AtomicLong();
- private BlockingQueue<Stream> streamQueue;
-
- private Logger logger;
-
- public Consumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
- this.topic = topic;
- this.partitionId = partitionId;
- this.kafkaConfig = kafkaConfig;
- this.replicaBrokers = initialBrokers;
- logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
- streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
- }
-
- public BlockingQueue<Stream> getStreamQueue() {
- return streamQueue;
- }
-
- private Broker getLeadBroker() {
- final PartitionMetadata partitionMetadata = Requester.getPartitionMetadata(topic, partitionId, replicaBrokers, kafkaConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
- replicaBrokers = partitionMetadata.replicas();
- return partitionMetadata.leader();
- } else {
- return null;
- }
- }
-
- @Override
- public void run() {
- while (true) {
- final Broker leadBroker = getLeadBroker();
- if (leadBroker == null) {
- logger.warn("cannot find lead broker");
- continue;
- }
- final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
- offset.set(lastOffset);
- final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
- if (fetchResponse.errorCode(topic, partitionId) != 0) {
- logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
- continue;
- }
- for (MessageAndOffset messageAndOffset: fetchResponse.messageSet(topic, partitionId)) {
- final ByteBuffer payload = messageAndOffset.message().payload();
- //TODO use ByteBuffer maybe
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- logger.debug("get message offset:" + messageAndOffset.offset());
- try {
- streamQueue.put(new Stream(System.currentTimeMillis(), bytes));
- } catch (InterruptedException e) {
- logger.error("error put streamQueue", e);
- }
- offset.incrementAndGet();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
deleted file mode 100644
index 82513eb..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/KafkaConfig.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 3/2/15.
- */
-public class KafkaConfig {
-
- private List<Broker> brokers;
-
- private String zookeeper;
-
- private String topic;
-
- private int timeout;
-
- private int maxReadCount;
-
- private int bufferSize;
-
- private int partitionId;
-
- public int getTimeout() {
- return timeout;
- }
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public int getMaxReadCount() {
- return maxReadCount;
- }
-
- public void setMaxReadCount(int maxReadCount) {
- this.maxReadCount = maxReadCount;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-
- public void setBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- }
-
- public List<Broker> getBrokers() {
- return brokers;
- }
-
- public void setBrokers(List<Broker> brokers) {
- this.brokers = brokers;
- }
-
- public String getZookeeper() {
- return zookeeper;
- }
-
- public void setZookeeper(String zookeeper) {
- this.zookeeper = zookeeper;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public static KafkaConfig load(KafkaConfig config) {
- KafkaConfig result = new KafkaConfig();
- result.setBufferSize(config.getBufferSize());
- result.setMaxReadCount(config.getMaxReadCount());
- result.setTimeout(config.getTimeout());
- result.setTopic(config.getTopic());
- result.setZookeeper(config.getZookeeper());
- result.setPartitionId(config.getPartitionId());
- result.setBrokers(config.getBrokers());
- return result;
- }
-
- public static KafkaConfig load(Properties properties) {
- Preconditions.checkNotNull(properties);
- KafkaConfig result = new KafkaConfig();
- result.setBufferSize(Integer.parseInt(properties.getProperty("consumer.bufferSize")));
- result.setMaxReadCount(Integer.parseInt(properties.getProperty("consumer.maxReadCount")));
- result.setTimeout(Integer.parseInt(properties.getProperty("consumer.timeout")));
- result.setTopic(properties.getProperty("topic"));
- result.setZookeeper(properties.getProperty("zookeeper"));
- result.setPartitionId(Integer.parseInt(properties.getProperty("partitionId")));
-
- int id = 0;
- List<Broker> brokers = Lists.newArrayList();
- for (String str: properties.getProperty("brokers").split(",")) {
- final String[] split = str.split(":");
- final Broker broker = new Broker(id++, split[0], Integer.parseInt(split[1]));
- brokers.add(broker);
- }
- result.setBrokers(brokers);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
deleted file mode 100644
index 8811695..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.*;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public final class Requester {
-
- private static final Logger logger = LoggerFactory.getLogger(Requester.class);
-
- public static TopicMeta getKafkaTopicMeta(KafkaConfig kafkaConfig) {
- SimpleConsumer consumer;
- for (Broker broker : kafkaConfig.getBrokers()) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp = consumer.send(req);
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- break;
- }
- List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
- @Nullable
- @Override
- public Integer apply(PartitionMetadata partitionMetadata) {
- return partitionMetadata.partitionId();
- }
- });
- return new TopicMeta(kafkaConfig.getTopic(), partitionIds);
- }
- logger.debug("cannot find topic:" + kafkaConfig.getTopic());
- return null;
- }
-
- public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaConfig kafkaConfig) {
- SimpleConsumer consumer;
- for (Broker broker : brokers) {
- consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(topic);
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp = consumer.send(req);
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
- break;
- }
- for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
- if (partitionMetadata.partitionId() == partitionId) {
- return partitionMetadata;
- }
- }
- }
- logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
- return null;
- }
-
- public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaConfig kafkaConfig) {
- final String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
- kafka.api.FetchRequest req = new FetchRequestBuilder()
- .clientId(clientName)
- .addFetch(topic, partitionId, offset, kafkaConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
- .build();
- return consumer.fetch(req);
- }
-
- public static long getLastOffset(String topic, int partitionId,
- long whichTime, Broker broker, KafkaConfig kafkaConfig) {
- String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), kafkaConfig.getTimeout(), kafkaConfig.getBufferSize(), clientName);
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
- requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
- OffsetResponse response = consumer.getOffsetsBefore(request);
-
- if (response.hasError()) {
- System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
- return 0;
- }
- long[] offsets = response.offsets(topic, partitionId);
- return offsets[0];
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
deleted file mode 100644
index 2a0ede4..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Stream.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-/**
- * Created by qianzhou on 2/15/15.
- */
-public class Stream {
-
- private long timestamp;
- private byte[] rawData;
-
- public Stream(long timestamp, byte[] rawData) {
- this.timestamp = timestamp;
- this.rawData = rawData;
- }
-
- public byte[] getRawData() {
- return rawData;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
deleted file mode 100644
index 145b5c8..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Created by qianzhou on 2/17/15.
- */
-public abstract class StreamBuilder implements Runnable {
-
- private List<BlockingQueue<Stream>> streamQueues;
- private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
- private final int batchBuildCount;
-
- public StreamBuilder(List<BlockingQueue<Stream>> streamQueues, int batchBuildCount) {
- this.streamQueues = streamQueues;
- this.batchBuildCount = batchBuildCount;
- }
-
-
- private int getEarliestStreamIndex(Stream[] streamHead) {
- long ts = Long.MAX_VALUE;
- int idx = 0;
- for (int i = 0; i < streamHead.length; i++) {
- if (streamHead[i].getTimestamp() < ts) {
- ts = streamHead[i].getTimestamp();
- idx = i;
- }
- }
- return idx;
- }
-
- protected abstract void build(List<Stream> streamsToBuild);
-
- @Override
- public void run() {
- try {
- Stream[] streamHead = new Stream[streamQueues.size()];
- for (int i = 0; i < streamQueues.size(); i++) {
- streamHead[i] = streamQueues.get(i).take();
- }
- List<Stream> streamToBuild = Lists.newArrayListWithCapacity(batchBuildCount);
- while (true) {
- if (streamToBuild.size() >= batchBuildCount) {
- build(streamToBuild);
- streamToBuild.clear();
- }
- int idx = getEarliestStreamIndex(streamHead);
- streamToBuild.add(streamHead[idx]);
- streamHead[idx] = streamQueues.get(idx).take();
- }
- } catch (InterruptedException e) {
- logger.error("", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
deleted file mode 100644
index 7822797..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/TopicMeta.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
- *
- * Created by qianzhou on 2/15/15.
- */
-public class TopicMeta {
-
- private final String name;
-
- private final List<Integer> partitionIds;
-
- public TopicMeta(String name, List<Integer> partitionIds) {
- this.name = name;
- this.partitionIds = Collections.unmodifiableList(partitionIds);
- }
-
- public String getName() {
- return name;
- }
-
- public List<Integer> getPartitionIds() {
- return partitionIds;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
new file mode 100644
index 0000000..89277d2
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaBaseTest.java
@@ -0,0 +1,78 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kylin.streaming.KafkaConfig;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public abstract class KafkaBaseTest {
+
+ protected static final Logger logger = LoggerFactory.getLogger("kafka test");
+
+ protected static ZkClient zkClient;
+
+ protected static KafkaConfig kafkaConfig;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ kafkaConfig = KafkaConfig.load(properties);
+
+ zkClient = new ZkClient(kafkaConfig.getZookeeper());
+ }
+
+
+ public static void createTopic(String topic, int partition, int replica) {
+ try {
+ AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
+ } catch (TopicExistsException e) {
+ logger.info(e.getMessage());
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
new file mode 100644
index 0000000..5612763
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConfigTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.streaming.KafkaConfig;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by qianzhou on 3/2/15.
+ */
+public class KafkaConfigTest {
+
+ @Test
+ public void test() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ KafkaConfig config = KafkaConfig.load(properties);
+ assertEquals(1000, config.getMaxReadCount());
+ assertEquals(65536, config.getBufferSize());
+ assertEquals(60000, config.getTimeout());
+ assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
+ assertEquals("kafka_stream_test", config.getTopic());
+ assertEquals(0, config.getPartitionId());
+ assertEquals(1, config.getBrokers().size());
+ assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
new file mode 100644
index 0000000..91e06fc
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class KafkaConsumerTest extends KafkaBaseTest {
+
+ private TestProducer producer;
+
+ private static final int TOTAL_SEND_COUNT = 100;
+
+ @Before
+ public void before() throws IOException {
+ producer = new TestProducer(TOTAL_SEND_COUNT);
+ producer.start();
+ }
+
+ @After
+ public void after() {
+ producer.stop();
+ }
+
+ private void waitForProducerToStop(TestProducer producer) {
+ while (!producer.isStopped()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void test() throws InterruptedException {
+ final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
+ final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+ List<BlockingQueue<Stream>> queues = Lists.newArrayList();
+ for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
+ KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+ queues.add(consumer.getStreamQueue());
+ executorService.execute(consumer);
+ }
+ waitForProducerToStop(producer);
+ int count = 0;
+ for (BlockingQueue<Stream> queue : queues) {
+ count += queue.size();
+ }
+ //since there will be historical data
+ assertTrue(count >= TOTAL_SEND_COUNT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
new file mode 100644
index 0000000..d54f22c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/RequesterTest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.streaming.KafkaConfig;
+import org.apache.kylin.streaming.Requester;
+import org.apache.kylin.streaming.TopicMeta;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class RequesterTest extends KafkaBaseTest {
+
+ private static final String NON_EXISTED_TOPIC = "non_existent_topic";
+
+
+
+ @AfterClass
+ public static void afterClass() {
+ }
+
+ @Test
+ public void testTopicMeta() throws Exception {
+ TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
+ assertNotNull(kafkaTopicMeta);
+ assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
+ assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
+
+ KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
+ anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
+
+ kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
+ assertTrue(kafkaTopicMeta == null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
new file mode 100644
index 0000000..368904c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/TestProducer.java
@@ -0,0 +1,114 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import kafka.cluster.Broker;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Created by qianzhou on 2/16/15.
+ */
+public class TestProducer {
+
+ private volatile boolean stopped = false;
+
+ private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
+
+ private final int sendCount;
+
+ public TestProducer(int sendCount) {
+ this.sendCount = sendCount;
+ }
+
+ public void start() throws IOException {
+ final Properties properties = new Properties();
+ properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
+ final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
+
+ Properties props = new Properties();
+ props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable Broker broker) {
+ return broker.getConnectionString();
+ }
+ }), ","));
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("request.required.acks", "1");
+ ProducerConfig config = new ProducerConfig(props);
+ final Producer<String, String> producer = new Producer<String, String>(config);
+
+ final Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int count = 0;
+ while (!stopped && count < sendCount) {
+ final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
+ producer.send(message);
+ count++;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ logger.debug("totally " + count +" messages have been sent");
+ stopped = true;
+
+ }
+ });
+ thread.setDaemon(false);
+ thread.start();
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
deleted file mode 100644
index 537a15d..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaBaseTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.TopicExistsException;
-import org.I0Itec.zkclient.ZkClient;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public abstract class KafkaBaseTest {
-
- protected static final Logger logger = LoggerFactory.getLogger("kafka test");
-
- protected static ZkClient zkClient;
-
- protected static KafkaConfig kafkaConfig;
-
- @BeforeClass
- public static void beforeClass() throws IOException {
- final Properties properties = new Properties();
- properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
- kafkaConfig = KafkaConfig.load(properties);
-
- zkClient = new ZkClient(kafkaConfig.getZookeeper());
- }
-
-
- public static void createTopic(String topic, int partition, int replica) {
- try {
- AdminUtils.createTopic(zkClient, topic, partition, replica, new Properties());
- } catch (TopicExistsException e) {
- logger.info(e.getMessage());
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
deleted file mode 100644
index 3c9bd87..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConfigTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Created by qianzhou on 3/2/15.
- */
-public class KafkaConfigTest {
-
- @Test
- public void test() throws IOException {
- final Properties properties = new Properties();
- properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
- KafkaConfig config = KafkaConfig.load(properties);
- assertEquals(1000, config.getMaxReadCount());
- assertEquals(65536, config.getBufferSize());
- assertEquals(60000, config.getTimeout());
- assertEquals("sandbox.hortonworks.com:2181", config.getZookeeper());
- assertEquals("kafka_stream_test", config.getTopic());
- assertEquals(0, config.getPartitionId());
- assertEquals(1, config.getBrokers().size());
- assertEquals("sandbox.hortonworks.com:6667", config.getBrokers().get(0).getConnectionString());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
deleted file mode 100644
index 4449d37..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/KafkaConsumerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import kafka.consumer.ConsumerConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class KafkaConsumerTest extends KafkaBaseTest {
-
- private TestProducer producer;
-
- private static final int TOTAL_SEND_COUNT = 100;
-
- @Before
- public void before() throws IOException {
- producer = new TestProducer(TOTAL_SEND_COUNT);
- producer.start();
- }
-
- @After
- public void after() {
- producer.stop();
- }
-
- private void waitForProducerToStop(TestProducer producer) {
- while (!producer.isStopped()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Test
- public void test() throws InterruptedException {
- final TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
- final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
- List<BlockingQueue<Stream>> queues = Lists.newArrayList();
- for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
- Consumer consumer = new Consumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
- queues.add(consumer.getStreamQueue());
- executorService.execute(consumer);
- }
- waitForProducerToStop(producer);
- int count = 0;
- for (BlockingQueue<Stream> queue : queues) {
- count += queue.size();
- }
- //since there will be historical data
- assertTrue(count >= TOTAL_SEND_COUNT);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
deleted file mode 100644
index 694c1fd..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/RequesterTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.*;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class RequesterTest extends KafkaBaseTest {
-
- private static final String NON_EXISTED_TOPIC = "non_existent_topic";
-
-
-
- @AfterClass
- public static void afterClass() {
- }
-
- @Test
- public void testTopicMeta() throws Exception {
- TopicMeta kafkaTopicMeta = Requester.getKafkaTopicMeta(kafkaConfig);
- assertNotNull(kafkaTopicMeta);
- assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
- assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
-
- KafkaConfig anotherTopicConfig = KafkaConfig.load(kafkaConfig);
- anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
-
- kafkaTopicMeta = Requester.getKafkaTopicMeta(anotherTopicConfig);
- assertTrue(kafkaTopicMeta == null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0970d76f/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
deleted file mode 100644
index cd3b166..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/kafka/TestProducer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import kafka.cluster.Broker;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Created by qianzhou on 2/16/15.
- */
-public class TestProducer {
-
- private volatile boolean stopped = false;
-
- private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
-
- private final int sendCount;
-
- public TestProducer(int sendCount) {
- this.sendCount = sendCount;
- }
-
- public void start() throws IOException {
- final Properties properties = new Properties();
- properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
- final KafkaConfig kafkaConfig = KafkaConfig.load(properties);
-
- Properties props = new Properties();
- props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(), new Function<Broker, String>() {
- @Nullable
- @Override
- public String apply(@Nullable Broker broker) {
- return broker.getConnectionString();
- }
- }), ","));
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("request.required.acks", "1");
- ProducerConfig config = new ProducerConfig(props);
- final Producer<String, String> producer = new Producer<String, String>(config);
-
- final Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- int count = 0;
- while (!stopped && count < sendCount) {
- final KeyedMessage<String, String> message = new KeyedMessage<>(kafkaConfig.getTopic(), "current time is:" + System.currentTimeMillis());
- producer.send(message);
- count++;
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- logger.debug("totally " + count +" messages have been sent");
- stopped = true;
-
- }
- });
- thread.setDaemon(false);
- thread.start();
- }
-
- public boolean isStopped() {
- return stopped;
- }
-
- public void stop() {
- stopped = true;
- }
-}