You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/28 13:25:18 UTC

incubator-kylin git commit: add multi consumer for single CubeStreamBuilder

Repository: incubator-kylin
Updated Branches:
  refs/heads/streaming-cubing 1ecf68185 -> 2f1b5bbb4


add multi consumer for single CubeStreamBuilder


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2f1b5bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2f1b5bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2f1b5bbb

Branch: refs/heads/streaming-cubing
Commit: 2f1b5bbb4d9640030d5eb3b68f43dbb14487fbb1
Parents: 1ecf681
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu May 28 19:25:11 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu May 28 19:25:11 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java | 45 +++++++++++++++-----
 1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f1b5bbb/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index adb9ea6..3929098 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -35,6 +35,7 @@
 package org.apache.kylin.job.streaming;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
@@ -128,22 +129,44 @@ public class StreamingBootstrap {
         }
     }
 
+    private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
+        List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
+        for (int partitionId = 0 ; partitionId < partitionCount && partitionId < 10; ++partitionId) {
+            final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+            long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
+            final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
+            streamingOffset = Math.max(streamingOffset, latestOffset);
+            KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
+                    streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+            Executors.newSingleThreadExecutor().submit(consumer);
+            result.add(consumer.getStreamQueue(0));
+        }
+        return result;
+    }
+
     private void startCubeStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
         final String cubeName = kafkaConfig.getCubeName();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, cubeName);
-        final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
-
-        long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
-        final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
-        streamingOffset = Math.max(streamingOffset, earliestOffset);
 
-        KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
-                streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
-        Executors.newSingleThreadExecutor().submit(consumer);
-        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+        final List<BlockingQueue<StreamMessage>> queues = consume(kafkaConfig, partitionCount);
+        final LinkedBlockingDeque<StreamMessage> streamQueue = new LinkedBlockingDeque<>();
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    for (BlockingQueue<StreamMessage> queue : queues) {
+                        try {
+                            streamQueue.put(queue.take());
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }
+        });
+        CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
         cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         future.get();
     }