You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/08/30 02:24:31 UTC

incubator-gearpump git commit: [GEARPUMP-197] fix busy loop in FetchThread

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 529799cc4 -> 3c0ebb13f


[GEARPUMP-197] fix busy loop in FetchThread

Author: manuzhang <ow...@gmail.com>

Closes #77 from manuzhang/fix_fetch_thread.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/3c0ebb13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/3c0ebb13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/3c0ebb13

Branch: refs/heads/master
Commit: 3c0ebb13f64ee6b64623668dec6d3923d6f09cf6
Parents: 529799c
Author: manuzhang <ow...@gmail.com>
Authored: Tue Aug 30 10:24:24 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Aug 30 10:24:24 2016 +0800

----------------------------------------------------------------------
 .../gearpump/streaming/kafka/util/KafkaConfig.java  |  3 +--
 .../kafka/lib/source/consumer/FetchThread.scala     | 16 +++++++++-------
 2 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/3c0ebb13/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
index 8c931cd..451faec 100644
--- a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
+++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
@@ -155,7 +155,7 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
             CONSUMER_START_OFFSET_DOC)
         .define(ENABLE_AUTO_COMMIT_CONFIG,
             ConfigDef.Type.BOOLEAN,
-            true,
+            false,
             ConfigDef.Importance.MEDIUM,
             ENABLE_AUTO_COMMIT_DOC)
         .define(CHECKPOINT_STORE_NAME_PREFIX_CONFIG,
@@ -209,7 +209,6 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
     if (!props.containsKey(GROUP_ID_CONFIG)) {
       props.put(GROUP_ID_CONFIG, getString(GROUP_ID_CONFIG));
     }
-    props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
 
     return new ConsumerConfig(props);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/3c0ebb13/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
index 3119f40..49c116c 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
@@ -116,9 +116,9 @@ private[kafka] class FetchThread(
         resetConsumers(nextOffsets)
         reset = false
       }
-      val hasMoreMessages = fetchMessage
+      val fetchMore: Boolean = fetchMessage
       sleeper.reset()
-      if (!hasMoreMessages) {
+      if (!fetchMore) {
         // sleep for given duration
         sleeper.sleep(fetchSleepMS)
       }
@@ -133,19 +133,21 @@ private[kafka] class FetchThread(
 
   /**
    * fetch message from each TopicAndPartition in a round-robin way
+   *
+   * @return whether to fetch more messages
    */
   private def fetchMessage: Boolean = {
-    consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
-      val (_, consumer) = tpAndConsumer
-      if (incomingQueue.size < fetchThreshold) {
+    if (incomingQueue.size >= fetchThreshold) {
+      false
+    } else {
+      consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
+        val (_, consumer) = tpAndConsumer
         if (consumer.hasNext) {
           incomingQueue.put(consumer.next())
           true
         } else {
           hasNext
         }
-      } else {
-        true
       }
     }
   }