You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/01 19:16:51 UTC
git commit: SAMZA-213;
update message chooser even when backing off polling of consumers
Repository: incubator-samza
Updated Branches:
refs/heads/master 10c6338a0 -> 4d0ad620b
SAMZA-213; update message chooser even when backing off polling of consumers
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4d0ad620
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4d0ad620
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4d0ad620
Branch: refs/heads/master
Commit: 4d0ad620b7879ae6d051a34766a65cd8e5554b63
Parents: 10c6338
Author: Anh Thu Vu <vu...@gmail.com>
Authored: Tue Apr 1 10:16:45 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Tue Apr 1 10:16:45 2014 -0700
----------------------------------------------------------------------
build.gradle | 2 +-
.../apache/samza/system/SystemConsumers.scala | 28 +++++++++++---------
2 files changed, 17 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4d0ad620/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 8e369b8..4775087 100644
--- a/build.gradle
+++ b/build.gradle
@@ -96,7 +96,7 @@ project(":samza-kafka_$scalaVersion") {
test {
// Bump up the heap so we can start ZooKeeper and Kafka brokers.
- maxHeapSize = "1024m"
+ maxHeapSize = "2048m"
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4d0ad620/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index b715937..bbbacb5 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -197,18 +197,7 @@ class SystemConsumers(
debug("Refreshing chooser with new messages.")
// Poll every system for new messages.
- val receivedNewMessages = consumers.keys.map(poll(_)).contains(true)
-
- // Update the chooser.
- neededByChooser.foreach(systemStreamPartition =>
- // If we have messages for a stream that the chooser needs, then update.
- if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
- chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
- updateFetchMap(systemStreamPartition)
- neededByChooser -= systemStreamPartition
- })
-
- receivedNewMessages
+ consumers.keys.map(poll(_)).contains(true)
}
}
@@ -237,6 +226,7 @@ class SystemConsumers(
}
refresh.maybeCall()
+ updateMessageChooser
envelopeFromChooser
}
@@ -303,4 +293,18 @@ class SystemConsumers(
fetchMap += systemStreamPartition -> fetchSize
systemFetchMapCache += systemName -> systemFetchMap
}
+
+ /**
+ * A helper method that updates MessageChooser. This should be called in
+ * "choose" method after we try to consume a message from MessageChooser.
+ */
+ private def updateMessageChooser {
+ neededByChooser.foreach(systemStreamPartition =>
+ // If we have messages for a stream that the chooser needs, then update.
+ if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
+ chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
+ updateFetchMap(systemStreamPartition)
+ neededByChooser -= systemStreamPartition
+ })
+ }
}