You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/05 06:35:50 UTC

[2/3] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/11e9d3e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/11e9d3e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/11e9d3e9

Branch: refs/heads/0.8.0
Commit: 11e9d3e92d3e886006ee526a2b3581bcf00f2bfe
Parents: 6133d73
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 4 20:29:30 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 11:57:01 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java | 38 +++++++++++++++-----
 .../apache/kylin/job/BuildIIWithStreamTest.java |  3 +-
 .../kylin/job/hadoop/invertedindex/IITest.java  |  2 +-
 .../job/streaming/CubeStreamConsumerTest.java   |  2 +-
 .../kylin/streaming/MicroStreamBatch.java       | 10 +++++-
 .../apache/kylin/streaming/StreamBuilder.java   | 31 +++++++++++++---
 6 files changed, 69 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index fd590d2..436c55b 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.lang.reflect.Constructor;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -135,16 +136,21 @@ public class StreamingBootstrap {
         }
     }
 
-    private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount) {
+    private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionOffset) {
         List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
         for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
             final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
-
+            final int transferredPartitionId = partitionId + partitionOffset;
             final long latestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
             long streamingOffset = latestOffset;
-            logger.info("submitting offset:" + streamingOffset);
-
-            KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, 1);
+            if (partitionIdOffsetMap.containsKey(transferredPartitionId)) {
+                final long earliestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
+                long committedOffset = partitionIdOffsetMap.get(transferredPartitionId);
+                Preconditions.checkArgument(committedOffset >= earliestOffset && committedOffset <= latestOffset, String.format("invalid offset:%d, earliestOffset:%d, latestOffset:%d", committedOffset, earliestOffset, latestOffset));
+                streamingOffset = committedOffset;
+            }
+            logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId + " transferredPartitionId:" + transferredPartitionId);
+            KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
             Executors.newSingleThreadExecutor().submit(consumer);
             result.add(consumer.getStreamQueue(0));
         }
@@ -156,15 +162,28 @@ public class StreamingBootstrap {
 
         final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
 
+        ArrayList<Integer> allPartitions = Lists.newArrayList();
+        int partitionOffset = 0;
+        for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
+            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+            for (int i = 0; i < partitionCount; i++) {
+                allPartitions.add(i + partitionOffset);
+            }
+            partitionOffset += partitionCount;
+        }
+        final Map<Integer, Long> partitionIdOffsetMap = streamingManager.getOffset(streamingConfig.getName(), allPartitions);
+
         int clusterID = 0;
+        partitionOffset = 0;
         for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
             final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
             Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
 
-            final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount);
+            final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionOffset);
             logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
             allClustersData.addAll(oneClusterData);
             clusterID++;
+            partitionOffset += partitionCount;
         }
 
         final String cubeName = streamingConfig.getCubeName();
@@ -174,7 +193,7 @@ public class StreamingBootstrap {
         MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
         long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
         logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp), DateFormat.formatToTimeStr(startTimestamp + batchInterval));
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
+        StreamBuilder cubeStreamBuilder = new StreamBuilder(streamingConfig.getName(), allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
         cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
             @Nullable
             @Override
@@ -242,7 +261,10 @@ public class StreamingBootstrap {
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism), new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE), new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
+            final StreamBuilder task = new StreamBuilder(streamingConfig.getName(),
+                    consumer.getStreamQueue(i % parallelism),
+                    new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
+                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
             task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index e84b176..6d660e5 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -214,7 +214,8 @@ public class BuildIIWithStreamTest {
         ToolRunner.run(new IICreateHTableJob(), args);
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final StreamBuilder streamBuilder = new StreamBuilder(queue,
+        final StreamBuilder streamBuilder = new StreamBuilder(iiName,
+                queue,
                 new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE),
                 new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0),
                 0);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index b2fb5ea..9a28ea4 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -85,7 +85,7 @@ public class IITest extends LocalFileMetadataTestCase {
         List<List<String>> parsedStreamMessages = Lists.newArrayList();
         StreamParser parser = StringStreamParser.instance;
 
-        MicroStreamBatch batch = new MicroStreamBatch();
+        MicroStreamBatch batch = new MicroStreamBatch(0);
         for (StreamMessage message : streamMessages) {
             ParsedStreamMessage parsedStreamMessage = parser.parse(message);
             if ((parsedStreamMessage.isAccepted())) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index 328ec72..ca0d037 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -64,7 +64,7 @@ public class CubeStreamConsumerTest {
     @Test
     public void test() throws Exception {
         LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
+        StreamBuilder cubeStreamBuilder = new StreamBuilder(CUBE_NAME, queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         loadDataFromLocalFile(queue, 100000);
         future.get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index e1ff60c..6ecd592 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -9,6 +9,8 @@ import java.util.List;
  */
 public final class MicroStreamBatch {
 
+    private final int partitionId;
+
     private final List<List<String>> streams;
 
     private final Pair<Long, Long> timestamp;
@@ -17,18 +19,24 @@ public final class MicroStreamBatch {
 
     private int rawMessageCount;
 
-    public MicroStreamBatch() {
+    public MicroStreamBatch(int partitionId) {
+        this.partitionId = partitionId;
         this.streams = Lists.newLinkedList();
         this.timestamp = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
         this.offset = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
     }
 
     private MicroStreamBatch(MicroStreamBatch batch) {
+        this.partitionId = batch.partitionId;
         this.streams = Lists.newLinkedList(batch.streams);
         this.timestamp = Pair.newPair(batch.timestamp.getFirst(), batch.timestamp.getSecond());
         this.offset = Pair.newPair(batch.offset.getFirst(), batch.offset.getSecond());
     }
 
+    public int getPartitionId() {
+        return partitionId;
+    }
+
     public final List<List<String>> getStreams() {
         return streams;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 8e5b0ca..cb8dfb1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -36,11 +36,14 @@ package org.apache.kylin.streaming;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.*;
 
@@ -59,8 +62,11 @@ public class StreamBuilder implements Runnable {
 
     private final MicroBatchCondition condition;
 
-    public StreamBuilder(List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+    private final String streaming;
+
+    public StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
         Preconditions.checkArgument(inputs.size() > 0);
+        this.streaming = streaming;
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
         this.condition = condition;
@@ -68,7 +74,8 @@ public class StreamBuilder implements Runnable {
         init(inputs);
     }
 
-    public StreamBuilder(BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+    public StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+        this.streaming = streaming;
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
         this.condition = condition;
@@ -90,11 +97,16 @@ public class StreamBuilder implements Runnable {
             final int inputCount = streamMessageQueues.size();
             final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
             long start = startTimestamp;
+            List<Integer> partitions = Lists.newArrayList();
+            for (int i = 0, partitionCount = streamMessageQueues.size(); i < partitionCount; i++) {
+                partitions.add(i);
+            }
             while (true) {
                 CountDownLatch countDownLatch = new CountDownLatch(inputCount);
                 ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
+                int partitionId = 0;
                 for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
-                    futures.add(executorService.submit(new StreamFetcher(streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
+                    futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
                 }
                 countDownLatch.await();
                 ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -126,6 +138,13 @@ public class StreamBuilder implements Runnable {
                     long startTime = System.currentTimeMillis();
                     consumer.consume(batch);
                     logger.info("Batch build costs {} milliseconds", System.currentTimeMillis() - startTime);
+                    if (batches.size() > 1) {
+                        final HashMap<Integer, Long> offset = Maps.newHashMap();
+                        for (MicroStreamBatch microStreamBatch : batches) {
+                            offset.put(microStreamBatch.getPartitionId(), microStreamBatch.getOffset().getSecond());
+                        }
+                        StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).updateOffset(streaming, offset);
+                    }
                 }
             }
         } catch (InterruptedException e) {
@@ -143,10 +162,12 @@ public class StreamBuilder implements Runnable {
 
         private final BlockingQueue<StreamMessage> streamMessageQueue;
         private final CountDownLatch countDownLatch;
+        private final int partitionId;
         private long startTimestamp;
         private long endTimestamp;
 
-        public StreamFetcher(BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+        public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+            this.partitionId = partitionId;
             this.streamMessageQueue = streamMessageQueue;
             this.countDownLatch = countDownLatch;
             this.startTimestamp = startTimestamp;
@@ -183,7 +204,7 @@ public class StreamBuilder implements Runnable {
                 MicroStreamBatch microStreamBatch = null;
                 while (true) {
                     if (microStreamBatch == null) {
-                        microStreamBatch = new MicroStreamBatch();
+                        microStreamBatch = new MicroStreamBatch(partitionId);
                         clearCounter();
                     }
                     StreamMessage streamMessage = peek(streamMessageQueue, 30000);