You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/06 10:40:20 UTC

[2/2] incubator-kylin git commit: increase retry wait time exponentially when failed to fetch Kafka Response, max wait time is about one minute

increase retry wait time exponentially when failed to fetch Kafka
Response, max wait time is about one minute


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1105c554
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1105c554
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1105c554

Branch: refs/heads/0.8
Commit: 1105c554397f27e2533fb880d32f143635c6d653
Parents: 876f34d
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Jul 6 16:38:54 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Jul 6 16:38:54 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/streaming/StreamingUtil.java   | 28 +++++++++++++-------
 1 file changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1105c554/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 0019a10..4e7234f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -21,6 +21,8 @@ public final class StreamingUtil {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamingUtil.class);
 
+    private static final int MAX_RETRY_TIMES = 6;
+
     private StreamingUtil() {
     }
 
@@ -33,31 +35,39 @@ public final class StreamingUtil {
         }
     }
 
+    private static void sleep(int retryTimes) {
+        int seconds = (int) Math.pow(2, retryTimes);
+        logger.info("retry times:" + retryTimes + " sleep:" + seconds + " seconds");
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset) {
         final String topic = kafkaClusterConfig.getTopic();
         int retry = 0;
-        while (retry++ < 4) {
+        while (retry < MAX_RETRY_TIMES) {//max sleep time 63 seconds
             final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
             if (leadBroker == null) {
                 logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
+                sleep(retry++);
                 continue;
             }
             final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
             if (response.errorCode(topic, partitionId) != 0) {
                 logger.warn("errorCode of FetchResponse is:" + response.errorCode(topic, partitionId));
+                sleep(retry++);
                 continue;
             }
             final Iterator<MessageAndOffset> iterator = response.messageSet(topic, partitionId).iterator();
-            if (iterator.hasNext()) {
-                return iterator.next();
-            } else {
-                try {
-                    Thread.sleep((long) (Math.pow(2, retry) * 1000));
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+            if (!iterator.hasNext()) {
+                logger.warn("messageSet is empty");
+                sleep(retry++);
                 continue;
             }
+            return iterator.next();
         }
         throw new IllegalStateException(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, partitionId, offset));
     }