You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/08/30 02:24:31 UTC
incubator-gearpump git commit: [GEARPUMP-197] fix busy loop in
FetchThread
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 529799cc4 -> 3c0ebb13f
[GEARPUMP-197] fix busy loop in FetchThread
Author: manuzhang <ow...@gmail.com>
Closes #77 from manuzhang/fix_fetch_thread.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/3c0ebb13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/3c0ebb13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/3c0ebb13
Branch: refs/heads/master
Commit: 3c0ebb13f64ee6b64623668dec6d3923d6f09cf6
Parents: 529799c
Author: manuzhang <ow...@gmail.com>
Authored: Tue Aug 30 10:24:24 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Aug 30 10:24:24 2016 +0800
----------------------------------------------------------------------
.../gearpump/streaming/kafka/util/KafkaConfig.java | 3 +--
.../kafka/lib/source/consumer/FetchThread.scala | 16 +++++++++-------
2 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/3c0ebb13/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
index 8c931cd..451faec 100644
--- a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
+++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
@@ -155,7 +155,7 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
CONSUMER_START_OFFSET_DOC)
.define(ENABLE_AUTO_COMMIT_CONFIG,
ConfigDef.Type.BOOLEAN,
- true,
+ false,
ConfigDef.Importance.MEDIUM,
ENABLE_AUTO_COMMIT_DOC)
.define(CHECKPOINT_STORE_NAME_PREFIX_CONFIG,
@@ -209,7 +209,6 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
if (!props.containsKey(GROUP_ID_CONFIG)) {
props.put(GROUP_ID_CONFIG, getString(GROUP_ID_CONFIG));
}
- props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
return new ConsumerConfig(props);
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/3c0ebb13/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
index 3119f40..49c116c 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
@@ -116,9 +116,9 @@ private[kafka] class FetchThread(
resetConsumers(nextOffsets)
reset = false
}
- val hasMoreMessages = fetchMessage
+ val fetchMore: Boolean = fetchMessage
sleeper.reset()
- if (!hasMoreMessages) {
+ if (!fetchMore) {
// sleep for given duration
sleeper.sleep(fetchSleepMS)
}
@@ -133,19 +133,21 @@ private[kafka] class FetchThread(
/**
* fetch message from each TopicAndPartition in a round-robin way
+ *
+ * @return whether to fetch more messages
*/
private def fetchMessage: Boolean = {
- consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
- val (_, consumer) = tpAndConsumer
- if (incomingQueue.size < fetchThreshold) {
+ if (incomingQueue.size >= fetchThreshold) {
+ false
+ } else {
+ consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
+ val (_, consumer) = tpAndConsumer
if (consumer.hasNext) {
incomingQueue.put(consumer.next())
true
} else {
hasNext
}
- } else {
- true
}
}
}