You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/03 05:00:57 UTC

incubator-kylin git commit: optimize OneOffStreamBuilder 1. replace Executors.newFixedThreadPool with newCachedThreadPool 2. retry when fail to get message from Kafka Producer, not just throw Exception

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 6d920a9ff -> a88e10bf2


optimize OneOffStreamBuilder
1. replace Executors.newFixedThreadPool with newCachedThreadPool
2. retry when fail to get message from Kafka Producer, not just throw
Exception


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a88e10bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a88e10bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a88e10bf

Branch: refs/heads/0.8
Commit: a88e10bf2030de73abf78e098d8a3b4b8f5369c4
Parents: 6d920a9
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jul 3 10:57:27 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jul 3 10:57:27 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java       |  5 ++++-
 .../org/apache/kylin/streaming/StreamingUtil.java     | 14 ++++++++++----
 2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a88e10bf/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index d4c5139..aaa4aa4 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -236,7 +236,10 @@ public class StreamingBootstrap {
         final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
 
         int clusterId = 0;
-        final ExecutorService executorService = Executors.newFixedThreadPool(10, new DaemonThreadFactory());
+        final ExecutorService executorService = new ThreadPoolExecutor(10, 10,
+                15L, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(),
+                new DaemonThreadFactory());
         final long targetTimestamp = startTimestamp - margin;
         for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
             final ConcurrentMap<Integer, Long> partitionIdOffsetMap = Maps.newConcurrentMap();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a88e10bf/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 264c384..961c725 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -36,15 +36,21 @@ public final class StreamingUtil {
     private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset) {
         final String topic = kafkaClusterConfig.getTopic();
         int retry = 3;
-        while (retry > 0) {
-            final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
+        while (retry-- > 0) {
+            final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
+            if (leadBroker == null) {
+                logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
+                continue;
+            }
             final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
-            Preconditions.checkArgument(response.errorCode(topic, partitionId) == 0, "errorCode of FetchResponse is:" + response.errorCode(topic, partitionId));
+            if (response.errorCode(topic, partitionId) != 0) {
+                logger.warn("errorCode of FetchResponse is:" + response.errorCode(topic, partitionId));
+                continue;
+            }
             final Iterator<MessageAndOffset> iterator = response.messageSet(topic, partitionId).iterator();
             if (iterator.hasNext()) {
                 return iterator.next();
             } else {
-                retry--;
                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e) {