You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/01 19:16:51 UTC

git commit: SAMZA-213; update message chooser even when backing off polling of consumers

Repository: incubator-samza
Updated Branches:
  refs/heads/master 10c6338a0 -> 4d0ad620b


SAMZA-213; update message chooser even when backing off polling of consumers


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4d0ad620
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4d0ad620
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4d0ad620

Branch: refs/heads/master
Commit: 4d0ad620b7879ae6d051a34766a65cd8e5554b63
Parents: 10c6338
Author: Anh Thu Vu <vu...@gmail.com>
Authored: Tue Apr 1 10:16:45 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Tue Apr 1 10:16:45 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |  2 +-
 .../apache/samza/system/SystemConsumers.scala   | 28 +++++++++++---------
 2 files changed, 17 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4d0ad620/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 8e369b8..4775087 100644
--- a/build.gradle
+++ b/build.gradle
@@ -96,7 +96,7 @@ project(":samza-kafka_$scalaVersion") {
 
   test {
     // Bump up the heap so we can start ZooKeeper and Kafka brokers.
-    maxHeapSize = "1024m"
+    maxHeapSize = "2048m"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4d0ad620/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index b715937..bbbacb5 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -197,18 +197,7 @@ class SystemConsumers(
       debug("Refreshing chooser with new messages.")
 
       // Poll every system for new messages.
-      val receivedNewMessages = consumers.keys.map(poll(_)).contains(true)
-
-      // Update the chooser.
-      neededByChooser.foreach(systemStreamPartition =>
-        // If we have messages for a stream that the chooser needs, then update.
-        if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
-          chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
-          updateFetchMap(systemStreamPartition)
-          neededByChooser -= systemStreamPartition
-        })
-        
-      receivedNewMessages
+      consumers.keys.map(poll(_)).contains(true)
     }
   }
 
@@ -237,6 +226,7 @@ class SystemConsumers(
     }
 
     refresh.maybeCall()
+    updateMessageChooser
     envelopeFromChooser
   }
   
@@ -303,4 +293,18 @@ class SystemConsumers(
     fetchMap += systemStreamPartition -> fetchSize
     systemFetchMapCache += systemName -> systemFetchMap
   }
+  
+  /**
+   * A helper method that updates MessageChooser. This should be called in 
+   * "choose" method after we try to consume a message from MessageChooser.
+   */
+  private def updateMessageChooser {
+    neededByChooser.foreach(systemStreamPartition =>
+      // If we have messages for a stream that the chooser needs, then update.
+      if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
+        chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
+        updateFetchMap(systemStreamPartition)
+        neededByChooser -= systemStreamPartition
+      })
+  }
 }