You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/07/03 11:37:29 UTC
incubator-kylin git commit: hotfix: still use fixed thread pool
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 633123732 -> d7bf59020
hotfix: still use fixed thread pool
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d7bf5902
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d7bf5902
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d7bf5902
Branch: refs/heads/0.8
Commit: d7bf59020ef5037fc3b53d5825836caa4e4f54cf
Parents: 6331237
Author: honma <ho...@ebay.com>
Authored: Fri Jul 3 17:37:15 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Jul 3 17:37:15 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 64 ++++++++++----------
1 file changed, 31 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7bf5902/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index aaa4aa4..d3013cb 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -236,10 +236,7 @@ public class StreamingBootstrap {
final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
int clusterId = 0;
- final ExecutorService executorService = new ThreadPoolExecutor(10, 10,
- 15L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new DaemonThreadFactory());
+ final ExecutorService executorService = Executors.newFixedThreadPool(10, new DaemonThreadFactory());
final long targetTimestamp = startTimestamp - margin;
for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
final ConcurrentMap<Integer, Long> partitionIdOffsetMap = Maps.newConcurrentMap();
@@ -268,6 +265,7 @@ public class StreamingBootstrap {
logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
clusterId++;
}
+ executorService.shutdown();
logger.info(String.format("starting one off streaming build with timestamp{%d, %d}", startTimestamp, endTimestamp));
OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
@@ -275,35 +273,35 @@ public class StreamingBootstrap {
logger.info("one off build finished");
}
-// private void startCalculatingMargin(final StreamingConfig streamingConfig) throws Exception {
-// final String cubeName = streamingConfig.getCubeName();
-// final StreamParser streamParser = getStreamParser(streamingConfig, Lists.<TblColRef>newArrayList());
-// final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
-//
-// int clusterId = 0;
-// final List<Pair<Long,Long>> firstAndLastOffsets = Lists.newArrayList();
-//
-// for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
-// final ConcurrentMap<Integer, Long> partitionIdOffsetMap = Maps.newConcurrentMap();
-// final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
-// for (int i = 0; i < partitionCount; ++i) {
-// Pair<Long,Long> firstlast = StreamingUtil.getFirstAndLastOffset(kafkaClusterConfig,i);
-// firstAndLastOffsets.add(firstlast);
-// partitionIdOffsetMap.putIfAbsent(i,firstlast.getFirst());
-// }
-//
-// logger.info("partitionId to start offset map:" + partitionIdOffsetMap);
-// Preconditions.checkArgument(partitionIdOffsetMap.size() == partitionCount, "fail to get all start offset");
-// final List<BlockingQueue<StreamMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, 0);
-// queues.addAll(oneClusterQueue);
-// logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
-// clusterId++;
-// }
-//
-// OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
-// Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
-// logger.info("one off build finished");
-// }
+ // private void startCalculatingMargin(final StreamingConfig streamingConfig) throws Exception {
+ // final String cubeName = streamingConfig.getCubeName();
+ // final StreamParser streamParser = getStreamParser(streamingConfig, Lists.<TblColRef>newArrayList());
+ // final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
+ //
+ // int clusterId = 0;
+ // final List<Pair<Long,Long>> firstAndLastOffsets = Lists.newArrayList();
+ //
+ // for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
+ // final ConcurrentMap<Integer, Long> partitionIdOffsetMap = Maps.newConcurrentMap();
+ // final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+ // for (int i = 0; i < partitionCount; ++i) {
+ // Pair<Long,Long> firstlast = StreamingUtil.getFirstAndLastOffset(kafkaClusterConfig,i);
+ // firstAndLastOffsets.add(firstlast);
+ // partitionIdOffsetMap.putIfAbsent(i,firstlast.getFirst());
+ // }
+ //
+ // logger.info("partitionId to start offset map:" + partitionIdOffsetMap);
+ // Preconditions.checkArgument(partitionIdOffsetMap.size() == partitionCount, "fail to get all start offset");
+ // final List<BlockingQueue<StreamMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, 0);
+ // queues.addAll(oneClusterQueue);
+ // logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
+ // clusterId++;
+ // }
+ //
+ // OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
+ // Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
+ // logger.info("one off build finished");
+ // }
private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {