You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/05 06:35:50 UTC
[2/3] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/11e9d3e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/11e9d3e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/11e9d3e9
Branch: refs/heads/0.8.0
Commit: 11e9d3e92d3e886006ee526a2b3581bcf00f2bfe
Parents: 6133d73
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 4 20:29:30 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 11:57:01 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 38 +++++++++++++++-----
.../apache/kylin/job/BuildIIWithStreamTest.java | 3 +-
.../kylin/job/hadoop/invertedindex/IITest.java | 2 +-
.../job/streaming/CubeStreamConsumerTest.java | 2 +-
.../kylin/streaming/MicroStreamBatch.java | 10 +++++-
.../apache/kylin/streaming/StreamBuilder.java | 31 +++++++++++++---
6 files changed, 69 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index fd590d2..436c55b 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.lang.reflect.Constructor;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -135,16 +136,21 @@ public class StreamingBootstrap {
}
}
- private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount) {
+ private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionOffset) {
List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
-
+ final int transferredPartitionId = partitionId + partitionOffset;
final long latestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
long streamingOffset = latestOffset;
- logger.info("submitting offset:" + streamingOffset);
-
- KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, 1);
+ if (partitionIdOffsetMap.containsKey(transferredPartitionId)) {
+ final long earliestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
+ long committedOffset = partitionIdOffsetMap.get(transferredPartitionId);
+ Preconditions.checkArgument(committedOffset >= earliestOffset && committedOffset <= latestOffset, String.format("invalid offset:%d, earliestOffset:%d, latestOffset:%d", committedOffset, earliestOffset, latestOffset));
+ streamingOffset = committedOffset;
+ }
+ logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId + " transferredPartitionId:" + transferredPartitionId);
+ KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
Executors.newSingleThreadExecutor().submit(consumer);
result.add(consumer.getStreamQueue(0));
}
@@ -156,15 +162,28 @@ public class StreamingBootstrap {
final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
+ ArrayList<Integer> allPartitions = Lists.newArrayList();
+ int partitionOffset = 0;
+ for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
+ final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+ for (int i = 0; i < partitionCount; i++) {
+ allPartitions.add(i + partitionOffset);
+ }
+ partitionOffset += partitionCount;
+ }
+ final Map<Integer, Long> partitionIdOffsetMap = streamingManager.getOffset(streamingConfig.getName(), allPartitions);
+
int clusterID = 0;
+ partitionOffset = 0;
for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
- final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount);
+ final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionOffset);
logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
allClustersData.addAll(oneClusterData);
clusterID++;
+ partitionOffset += partitionCount;
}
final String cubeName = streamingConfig.getCubeName();
@@ -174,7 +193,7 @@ public class StreamingBootstrap {
MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp), DateFormat.formatToTimeStr(startTimestamp + batchInterval));
- StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
+ StreamBuilder cubeStreamBuilder = new StreamBuilder(streamingConfig.getName(), allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
@Nullable
@Override
@@ -242,7 +261,10 @@ public class StreamingBootstrap {
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism), new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE), new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
+ final StreamBuilder task = new StreamBuilder(streamingConfig.getName(),
+ consumer.getStreamQueue(i % parallelism),
+ new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
+ new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index e84b176..6d660e5 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -214,7 +214,8 @@ public class BuildIIWithStreamTest {
ToolRunner.run(new IICreateHTableJob(), args);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- final StreamBuilder streamBuilder = new StreamBuilder(queue,
+ final StreamBuilder streamBuilder = new StreamBuilder(iiName,
+ queue,
new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE),
new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0),
0);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index b2fb5ea..9a28ea4 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -85,7 +85,7 @@ public class IITest extends LocalFileMetadataTestCase {
List<List<String>> parsedStreamMessages = Lists.newArrayList();
StreamParser parser = StringStreamParser.instance;
- MicroStreamBatch batch = new MicroStreamBatch();
+ MicroStreamBatch batch = new MicroStreamBatch(0);
for (StreamMessage message : streamMessages) {
ParsedStreamMessage parsedStreamMessage = parser.parse(message);
if ((parsedStreamMessage.isAccepted())) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index 328ec72..ca0d037 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -64,7 +64,7 @@ public class CubeStreamConsumerTest {
@Test
public void test() throws Exception {
LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
- StreamBuilder cubeStreamBuilder = new StreamBuilder(queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
+ StreamBuilder cubeStreamBuilder = new StreamBuilder(CUBE_NAME, queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
loadDataFromLocalFile(queue, 100000);
future.get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index e1ff60c..6ecd592 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -9,6 +9,8 @@ import java.util.List;
*/
public final class MicroStreamBatch {
+ private final int partitionId;
+
private final List<List<String>> streams;
private final Pair<Long, Long> timestamp;
@@ -17,18 +19,24 @@ public final class MicroStreamBatch {
private int rawMessageCount;
- public MicroStreamBatch() {
+ public MicroStreamBatch(int partitionId) {
+ this.partitionId = partitionId;
this.streams = Lists.newLinkedList();
this.timestamp = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
this.offset = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
}
private MicroStreamBatch(MicroStreamBatch batch) {
+ this.partitionId = batch.partitionId;
this.streams = Lists.newLinkedList(batch.streams);
this.timestamp = Pair.newPair(batch.timestamp.getFirst(), batch.timestamp.getSecond());
this.offset = Pair.newPair(batch.offset.getFirst(), batch.offset.getSecond());
}
+ public int getPartitionId() {
+ return partitionId;
+ }
+
public final List<List<String>> getStreams() {
return streams;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 8e5b0ca..cb8dfb1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -36,11 +36,14 @@ package org.apache.kylin.streaming;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.*;
@@ -59,8 +62,11 @@ public class StreamBuilder implements Runnable {
private final MicroBatchCondition condition;
- public StreamBuilder(List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ private final String streaming;
+
+ public StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
Preconditions.checkArgument(inputs.size() > 0);
+ this.streaming = streaming;
this.streamMessageQueues = Lists.newArrayList();
this.consumer = Preconditions.checkNotNull(consumer);
this.condition = condition;
@@ -68,7 +74,8 @@ public class StreamBuilder implements Runnable {
init(inputs);
}
- public StreamBuilder(BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ public StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ this.streaming = streaming;
this.streamMessageQueues = Lists.newArrayList();
this.consumer = Preconditions.checkNotNull(consumer);
this.condition = condition;
@@ -90,11 +97,16 @@ public class StreamBuilder implements Runnable {
final int inputCount = streamMessageQueues.size();
final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
long start = startTimestamp;
+ List<Integer> partitions = Lists.newArrayList();
+ for (int i = 0, partitionCount = streamMessageQueues.size(); i < partitionCount; i++) {
+ partitions.add(i);
+ }
while (true) {
CountDownLatch countDownLatch = new CountDownLatch(inputCount);
ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
+ int partitionId = 0;
for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
- futures.add(executorService.submit(new StreamFetcher(streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
+ futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
}
countDownLatch.await();
ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -126,6 +138,13 @@ public class StreamBuilder implements Runnable {
long startTime = System.currentTimeMillis();
consumer.consume(batch);
logger.info("Batch build costs {} milliseconds", System.currentTimeMillis() - startTime);
+ if (batches.size() > 1) {
+ final HashMap<Integer, Long> offset = Maps.newHashMap();
+ for (MicroStreamBatch microStreamBatch : batches) {
+ offset.put(microStreamBatch.getPartitionId(), microStreamBatch.getOffset().getSecond());
+ }
+ StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).updateOffset(streaming, offset);
+ }
}
}
} catch (InterruptedException e) {
@@ -143,10 +162,12 @@ public class StreamBuilder implements Runnable {
private final BlockingQueue<StreamMessage> streamMessageQueue;
private final CountDownLatch countDownLatch;
+ private final int partitionId;
private long startTimestamp;
private long endTimestamp;
- public StreamFetcher(BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+ public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+ this.partitionId = partitionId;
this.streamMessageQueue = streamMessageQueue;
this.countDownLatch = countDownLatch;
this.startTimestamp = startTimestamp;
@@ -183,7 +204,7 @@ public class StreamBuilder implements Runnable {
MicroStreamBatch microStreamBatch = null;
while (true) {
if (microStreamBatch == null) {
- microStreamBatch = new MicroStreamBatch();
+ microStreamBatch = new MicroStreamBatch(partitionId);
clearCounter();
}
StreamMessage streamMessage = peek(streamMessageQueue, 30000);