You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/28 13:25:18 UTC
incubator-kylin git commit: add multi consumer for single
CubeStreamBuilder
Repository: incubator-kylin
Updated Branches:
refs/heads/streaming-cubing 1ecf68185 -> 2f1b5bbb4
add multi consumer for single CubeStreamBuilder
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2f1b5bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2f1b5bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2f1b5bbb
Branch: refs/heads/streaming-cubing
Commit: 2f1b5bbb4d9640030d5eb3b68f43dbb14487fbb1
Parents: 1ecf681
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu May 28 19:25:11 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu May 28 19:25:11 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 45 +++++++++++++++-----
1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f1b5bbb/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index adb9ea6..3929098 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -35,6 +35,7 @@
package org.apache.kylin.job.streaming;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
@@ -128,22 +129,44 @@ public class StreamingBootstrap {
}
}
+ private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
+ List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
+ for (int partitionId = 0 ; partitionId < partitionCount && partitionId < 10; ++partitionId) {
+ final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+ long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
+ final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
+ streamingOffset = Math.max(streamingOffset, latestOffset);
+ KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
+ streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+ Executors.newSingleThreadExecutor().submit(consumer);
+ result.add(consumer.getStreamQueue(0));
+ }
+ return result;
+ }
+
private void startCubeStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
final String cubeName = kafkaConfig.getCubeName();
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
- LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
- CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, cubeName);
- final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
-
- long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
- final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
- streamingOffset = Math.max(streamingOffset, earliestOffset);
- KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
- streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
- Executors.newSingleThreadExecutor().submit(consumer);
- final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+ final List<BlockingQueue<StreamMessage>> queues = consume(kafkaConfig, partitionCount);
+ final LinkedBlockingDeque<StreamMessage> streamQueue = new LinkedBlockingDeque<>();
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ for (BlockingQueue<StreamMessage> queue : queues) {
+ try {
+ streamQueue.put(queue.take());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ });
+ CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+ final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
future.get();
}