You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/07/03 11:37:29 UTC

incubator-kylin git commit: hotfix: still use fixed thread pool

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 633123732 -> d7bf59020


hotfix: still use fixed thread pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d7bf5902
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d7bf5902
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d7bf5902

Branch: refs/heads/0.8
Commit: d7bf59020ef5037fc3b53d5825836caa4e4f54cf
Parents: 6331237
Author: honma <ho...@ebay.com>
Authored: Fri Jul 3 17:37:15 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Jul 3 17:37:15 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java | 64 ++++++++++----------
 1 file changed, 31 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7bf5902/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index aaa4aa4..d3013cb 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -236,10 +236,7 @@ public class StreamingBootstrap {
         final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
 
         int clusterId = 0;
-        final ExecutorService executorService = new ThreadPoolExecutor(10, 10,
-                15L, TimeUnit.SECONDS,
-                new SynchronousQueue<Runnable>(),
-                new DaemonThreadFactory());
+        final ExecutorService executorService = Executors.newFixedThreadPool(10, new DaemonThreadFactory());
         final long targetTimestamp = startTimestamp - margin;
         for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
             final ConcurrentMap<Integer, Long> partitionIdOffsetMap = Maps.newConcurrentMap();
@@ -268,6 +265,7 @@ public class StreamingBootstrap {
             logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
             clusterId++;
         }
+        executorService.shutdown();
 
         logger.info(String.format("starting one off streaming build with timestamp{%d, %d}", startTimestamp, endTimestamp));
         OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
@@ -275,35 +273,35 @@ public class StreamingBootstrap {
         logger.info("one off build finished");
     }
 
-//    private void startCalculatingMargin(final StreamingConfig streamingConfig) throws Exception {
-//        final String cubeName = streamingConfig.getCubeName();
-//        final StreamParser streamParser = getStreamParser(streamingConfig, Lists.<TblColRef>newArrayList());
-//        final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
-//
-//        int clusterId = 0;
-//        final List<Pair<Long,Long>> firstAndLastOffsets = Lists.newArrayList();
-//
-//        for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
-//            final ConcurrentMap<Integer, Long> partitionIdOffsetMap = Maps.newConcurrentMap();
-//            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
-//            for (int i = 0; i < partitionCount; ++i) {
-//                Pair<Long,Long> firstlast = StreamingUtil.getFirstAndLastOffset(kafkaClusterConfig,i);
-//                firstAndLastOffsets.add(firstlast);
-//                partitionIdOffsetMap.putIfAbsent(i,firstlast.getFirst());
-//            }
-//
-//            logger.info("partitionId to start offset map:" + partitionIdOffsetMap);
-//            Preconditions.checkArgument(partitionIdOffsetMap.size() == partitionCount, "fail to get all start offset");
-//            final List<BlockingQueue<StreamMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, 0);
-//            queues.addAll(oneClusterQueue);
-//            logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
-//            clusterId++;
-//        }
-//
-//        OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
-//        Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
-//        logger.info("one off build finished");
-//    }
+    //    private void startCalculatingMargin(final StreamingConfig streamingConfig) throws Exception {
+    //        final String cubeName = streamingConfig.getCubeName();
+    //        final StreamParser streamParser = getStreamParser(streamingConfig, Lists.<TblColRef>newArrayList());
+    //        final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
+    //
+    //        int clusterId = 0;
+    //        final List<Pair<Long,Long>> firstAndLastOffsets = Lists.newArrayList();
+    //
+    //        for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
+    //            final ConcurrentMap<Integer, Long> partitionIdOffsetMap = Maps.newConcurrentMap();
+    //            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+    //            for (int i = 0; i < partitionCount; ++i) {
+    //                Pair<Long,Long> firstlast = StreamingUtil.getFirstAndLastOffset(kafkaClusterConfig,i);
+    //                firstAndLastOffsets.add(firstlast);
+    //                partitionIdOffsetMap.putIfAbsent(i,firstlast.getFirst());
+    //            }
+    //
+    //            logger.info("partitionId to start offset map:" + partitionIdOffsetMap);
+    //            Preconditions.checkArgument(partitionIdOffsetMap.size() == partitionCount, "fail to get all start offset");
+    //            final List<BlockingQueue<StreamMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, 0);
+    //            queues.addAll(oneClusterQueue);
+    //            logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
+    //            clusterId++;
+    //        }
+    //
+    //        OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
+    //        Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
+    //        logger.info("one off build finished");
+    //    }
 
     private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {