You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/07/25 00:40:56 UTC

git commit: SAMZA-245; improve system consumers performance

Repository: incubator-samza
Updated Branches:
  refs/heads/master 5c65b03a4 -> 7cecf0aef


SAMZA-245; improve system consumers performance


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7cecf0ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7cecf0ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7cecf0ae

Branch: refs/heads/master
Commit: 7cecf0aef9fb968b0851746bdb01eaca1dc81050
Parents: 5c65b03
Author: Chris Riccomini <criccomi@criccomi-mn.(none)>
Authored: Thu Jul 24 15:40:49 2014 -0700
Committer: Chris Riccomini <criccomi@criccomi-mn.(none)>
Committed: Thu Jul 24 15:40:49 2014 -0700

----------------------------------------------------------------------
 .../0.7.0/jobs/configuration-table.html         |  15 +
 .../org/apache/samza/system/SystemConsumer.java |  31 ++-
 .../system/SystemStreamPartitionIterator.java   |  21 +-
 .../apache/samza/util/BlockingEnvelopeMap.java  |  56 ++--
 .../TestSystemStreamPartitionIterator.java      |  22 +-
 .../samza/util/TestBlockingEnvelopeMap.java     |  40 +--
 .../org/apache/samza/config/TaskConfig.scala    |  19 ++
 .../apache/samza/container/SamzaContainer.scala |  13 +-
 .../apache/samza/system/SystemConsumers.scala   | 272 ++++++++++---------
 .../samza/system/SystemConsumersMetrics.scala   |  12 +-
 .../chooser/BufferingMessageChooser.scala       | 208 --------------
 .../system/chooser/RoundRobinChooser.scala      |  27 +-
 .../org/apache/samza/util/DoublingBackOff.scala |  62 -----
 .../samza/system/TestSystemConsumers.scala      | 181 ++++++++++--
 .../chooser/TestBufferingMessageChooser.scala   | 108 --------
 .../TestFileReaderSystemConsumer.scala          |  45 +--
 .../apache/samza/util/TestDoublingBackOff.scala |  61 -----
 .../test/performance/TestPerformanceTask.scala  |   4 +-
 .../TestSamzaContainerPerformance.scala         |   2 +-
 19 files changed, 453 insertions(+), 746 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/docs/learn/documentation/0.7.0/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/jobs/configuration-table.html b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
index edcb74f..41f334f 100644
--- a/docs/learn/documentation/0.7.0/jobs/configuration-table.html
+++ b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
@@ -377,6 +377,21 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-poll-interval-ms">task.poll.interval.ms</td>
+                    <td class="default"></td>
+                    <td class="description">
+                      Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining 
+                      buffered messages to process for any input SystemStreamPartition. The second condition arises when some input 
+                      SystemStreamPartitions have empty buffers, but some do not. In the latter case, a polling interval is defined to determine how
+                      often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which means that any empty 
+                      SystemStreamPartition buffer will be refreshed at least every 50ms. A higher value here means that empty SystemStreamPartitions 
+                      will be refreshed less often, which means more latency is introduced, but less CPU and network will be used. Decreasing this 
+                      value means that empty SystemStreamPartitions are refreshed more frequently, thereby introducing less latency, but increasing 
+                      CPU and network utilization.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
                 </tr>
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
index 591f8fb..37c6c76 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
@@ -21,6 +21,7 @@ package org.apache.samza.system;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * <p>
@@ -137,35 +138,39 @@ public interface SystemConsumer {
    * Poll the SystemConsumer to get any available messages from the underlying
    * system.
    * 
-   * <p>If the underlying implementation does not take care to adhere to the
+   * <p>
+   * If the underlying implementation does not take care to adhere to the
    * timeout parameter, the SamzaContainer's performance will suffer
    * drastically. Specifically, if poll blocks when it's not supposed to, it
    * will block the entire main thread in SamzaContainer, and no messages will
    * be processed while blocking is occurring.
+   * </p>
    * 
    * @param systemStreamPartitions
-   *          A map from SystemStreamPartition to maximum number of messages to
-   *          return for the SystemStreamPartition. Polling with {stream1: 100,
-   *          stream2: 1000} tells the SystemConsumer that it can return between
-   *          0 and 100 messages (inclusive) for stream1, and between 0 and 1000
-   *          messages for stream2. If SystemConsumer has messages available for
-   *          other registered SystemStreamPartitions, but they are not in the
-   *          systemStreamPartitions map in a given poll invocation, they can't
+   *          A set of SystemStreamPartition to poll for new messages. If
+   *          SystemConsumer has messages available for other registered
+   *          SystemStreamPartitions, but they are not in the
+   *          systemStreamPartitions set in a given poll invocation, they can't
    *          be returned. It is illegal to pass in SystemStreamPartitions that
    *          have not been registered with the SystemConsumer first.
    * @param timeout
    *          If timeout < 0, poll will block unless all SystemStreamPartition
    *          are at "head" (the underlying system has been checked, and
-   *          returned an empty set). If at head, an empty list is returned. If
+   *          returned an empty set). If at head, an empty map is returned. If
    *          timeout >= 0, poll will return any messages that are currently
    *          available for any of the SystemStreamPartitions specified. If no
    *          new messages are available, it will wait up to timeout
    *          milliseconds for messages from any SystemStreamPartition to become
-   *          available. It will return an empty list if the timeout is hit, and
+   *          available. It will return an empty map if the timeout is hit, and
    *          no new messages are available.
-   * @return A list of zero or more IncomingMessageEnvelopes for the
-   *         SystemStreamPartitions that were supplied during the invocation.
+   * @return A map from SystemStreamPartitions to any available
+   *         IncomingMessageEnvelopes for the SystemStreamPartitions. If no
+   *         messages are available for a SystemStreamPartition that was
+   *         supplied in the polling set, the map will not contain a key for the
+   *         SystemStreamPartition. Will return an empty map, not null, if no
+   *         new messages are available for any SystemStreamPartitions in the
+   *         input set.
    * @throws InterruptedException
    */
-  List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitions, long timeout) throws InterruptedException;
+  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
index 9acfb10..a8f858a 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
@@ -20,22 +20,25 @@
 package org.apache.samza.system;
 
 import java.util.ArrayDeque;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.Set;
 
 import org.apache.samza.SamzaException;
 
 /**
- * {@link java.util.Iterator} that wraps a {@link org.apache.samza.system.SystemConsumer} to iterate over
- * the messages the consumer provides for the specified {@link org.apache.samza.system.SystemStreamPartition}.
+ * {@link java.util.Iterator} that wraps a
+ * {@link org.apache.samza.system.SystemConsumer} to iterate over the messages
+ * the consumer provides for the specified
+ * {@link org.apache.samza.system.SystemStreamPartition}.
  */
 public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEnvelope> {
   private final SystemConsumer systemConsumer;
-  private final Map<SystemStreamPartition, Integer> fetchMap;
+  private final Set<SystemStreamPartition> fetchSet;
   private Queue<IncomingMessageEnvelope> peeks;
 
   public SystemStreamPartitionIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition) {
@@ -44,8 +47,8 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn
 
   public SystemStreamPartitionIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, int fetchSize) {
     this.systemConsumer = systemConsumer;
-    this.fetchMap = new HashMap<SystemStreamPartition, Integer>();
-    this.fetchMap.put(systemStreamPartition, fetchSize);
+    this.fetchSet = new HashSet<SystemStreamPartition>();
+    this.fetchSet.add(systemStreamPartition);
     this.peeks = new ArrayDeque<IncomingMessageEnvelope>();
   }
 
@@ -74,10 +77,10 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn
   private void refresh() {
     if (peeks.size() == 0) {
       try {
-        List<IncomingMessageEnvelope> envelopes = systemConsumer.poll(fetchMap, SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES);
+        Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = systemConsumer.poll(fetchSet, SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES);
 
-        if (envelopes != null && envelopes.size() > 0) {
-          peeks.addAll(envelopes);
+        for (List<IncomingMessageEnvelope> systemStreamPartitionEnvelopes : envelopes.values()) {
+          peeks.addAll(systemStreamPartitionEnvelopes);
         }
       } catch (InterruptedException e) {
         throw new SamzaException(e);

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 9503739..317e073 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -20,13 +20,16 @@
 package org.apache.samza.util;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -39,8 +42,8 @@ import org.apache.samza.system.SystemStreamPartition;
  * BlockingEnvelopeMap is a helper class for SystemConsumer implementations.
  * Samza's poll() requirements make implementing SystemConsumers somewhat
  * tricky. BlockingEnvelopeMap is provided to help other developers write
- * SystemConsumers. The intended audience is not those writing Samza jobs,
- * but rather those extending Samza to consume from new types of stream providers
+ * SystemConsumers. The intended audience is not those writing Samza jobs, but
+ * rather those extending Samza to consume from new types of stream providers
  * and other systems.
  * </p>
  * 
@@ -97,55 +100,48 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     return new LinkedBlockingQueue<IncomingMessageEnvelope>();
   }
 
-  public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitionAndMaxPerStream, long timeout) throws InterruptedException {
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
     long stopTime = clock.currentTimeMillis() + timeout;
-    List<IncomingMessageEnvelope> messagesToReturn = new ArrayList<IncomingMessageEnvelope>();
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
 
     metrics.incPoll();
 
-    for (Map.Entry<SystemStreamPartition, Integer> systemStreamPartitionAndMaxCount : systemStreamPartitionAndMaxPerStream.entrySet()) {
-      SystemStreamPartition systemStreamPartition = systemStreamPartitionAndMaxCount.getKey();
-      Integer numMessages = systemStreamPartitionAndMaxCount.getValue();
+    for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
       BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
-      IncomingMessageEnvelope envelope = null;
-      List<IncomingMessageEnvelope> systemStreamPartitionMessages = new ArrayList<IncomingMessageEnvelope>();
-
-      // First, drain all messages up to numMessages without blocking.
-      // Stop when we've filled the request (max numMessages), or when
-      // we get a null envelope back.
-      for (int i = 0; i < numMessages && (i == 0 || envelope != null); ++i) {
-        envelope = queue.poll();
+      List<IncomingMessageEnvelope> outgoingList = new ArrayList<IncomingMessageEnvelope>(queue.size());
 
-        if (envelope != null) {
-          systemStreamPartitionMessages.add(envelope);
-        }
-      }
+      if (queue.size() > 0) {
+        queue.drainTo(outgoingList);
+      } else if (timeout != 0) {
+        IncomingMessageEnvelope envelope = null;
 
-      // Now block if blocking is allowed and we have no messages.
-      if (systemStreamPartitionMessages.size() == 0) {
         // How long we can legally block (if timeout > 0)
         long timeRemaining = stopTime - clock.currentTimeMillis();
 
         if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) {
-          while (systemStreamPartitionMessages.size() < numMessages && !isAtHead(systemStreamPartition)) {
+          // Block until we get at least one message, or until we catch up to
+          // the head of the stream.
+          while (envelope == null && !isAtHead(systemStreamPartition)) {
             metrics.incBlockingPoll(systemStreamPartition);
             envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
-
-            if (envelope != null) {
-              systemStreamPartitionMessages.add(envelope);
-            }
           }
         } else if (timeout > 0 && timeRemaining > 0) {
+          // Block until we get at least one message.
           metrics.incBlockingTimeoutPoll(systemStreamPartition);
           envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
+        }
 
-          if (envelope != null) {
-            systemStreamPartitionMessages.add(envelope);
-          }
+        // If we got a message, add it.
+        if (envelope != null) {
+          outgoingList.add(envelope);
+          // Drain any remaining messages without blocking.
+          queue.drainTo(outgoingList);
         }
       }
 
-      messagesToReturn.addAll(systemStreamPartitionMessages);
+      if (outgoingList.size() > 0) {
+        messagesToReturn.put(systemStreamPartition, outgoingList);
+      }
     }
 
     return messagesToReturn;

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
index 3ecabab..5af2a11 100644
--- a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
+++ b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
@@ -24,13 +24,15 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-
-import org.junit.Test;
+import java.util.Queue;
+import java.util.Set;
 
 import org.apache.samza.Partition;
+import org.junit.Test;
 
 public class TestSystemStreamPartitionIterator {
   private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0));
@@ -105,14 +107,20 @@ public class TestSystemStreamPartitionIterator {
     }
 
     @Override
-    public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitions, long timeout) {
-      List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
+    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> systemStreamPartitionEnvelopes = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+
+      for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
+        List<IncomingMessageEnvelope> q = new ArrayList<IncomingMessageEnvelope>();
+
+        if (numPollReturnsWithMessages-- > 0) {
+          q.add(new IncomingMessageEnvelope(SSP, "", null, numPollReturnsWithMessages));
+        }
 
-      if (numPollReturnsWithMessages-- > 0) {
-        list.add(new IncomingMessageEnvelope(SSP, "", null, numPollReturnsWithMessages));
+        systemStreamPartitionEnvelopes.put(systemStreamPartition, q);
       }
 
-      return list;
+      return systemStreamPartitionEnvelopes;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index cb4d148..35ba52d 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -19,26 +19,29 @@
 
 package org.apache.samza.util;
 
-import static org.junit.Assert.*;
-import java.util.HashMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import org.junit.Test;
 import org.apache.samza.Partition;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
 
 public class TestBlockingEnvelopeMap {
   private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0));
   private static final IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(SSP, null, null, null);
-  private static final Map<SystemStreamPartition, Integer> FETCH = new HashMap<SystemStreamPartition, Integer>();
+  private static final Set<SystemStreamPartition> FETCH = new HashSet<SystemStreamPartition>();
 
   static {
-    FETCH.put(SSP, 10);
+    FETCH.add(SSP);
   }
 
   @Test
@@ -65,33 +68,14 @@ public class TestBlockingEnvelopeMap {
     BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
     map.register(SSP, "0");
     map.put(SSP, envelope);
-    List<IncomingMessageEnvelope> envelopes = map.poll(FETCH, 0);
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
     assertEquals(1, envelopes.size());
+    assertEquals(1, envelopes.get(SSP).size());
     map.put(SSP, envelope);
     map.put(SSP, envelope);
     envelopes = map.poll(FETCH, 0);
-    assertEquals(2, envelopes.size());
-  }
-
-  @Test
-  public void testShouldNotReturnMoreEnvelopesThanAllowed() throws InterruptedException {
-    BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
-    int maxMessages = FETCH.get(SSP);
-
-    map.register(SSP, "0");
-
-    for (int i = 0; i < 3 * maxMessages; ++i) {
-      map.put(SSP, envelope);
-    }
-
-    assertEquals(3 * maxMessages, map.getNumMessagesInQueue(SSP));
-    assertEquals(maxMessages, map.poll(FETCH, 0).size());
-    assertEquals(2 * maxMessages, map.getNumMessagesInQueue(SSP));
-    assertEquals(maxMessages, map.poll(FETCH, 30).size());
-    assertEquals(maxMessages, map.getNumMessagesInQueue(SSP));
-    assertEquals(maxMessages, map.poll(FETCH, 0).size());
-    assertEquals(0, map.getNumMessagesInQueue(SSP));
-    assertEquals(0, map.poll(FETCH, 0).size());
+    assertEquals(1, envelopes.size());
+    assertEquals(2, envelopes.get(SSP).size());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 8b881f2..21d8903 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -36,6 +36,23 @@ object TaskConfig {
   val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
   val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
 
+  /**
+   * Samza's container polls for more messages under two conditions. The first
+   * condition arises when there are simply no remaining buffered messages to
+   * process for any input SystemStreamPartition. The second condition arises
+   * when some input SystemStreamPartitions have empty buffers, but some do
+   * not. In the latter case, a polling interval is defined to determine how
+   * often to refresh the empty SystemStreamPartition buffers. By default,
+   * this interval is 50ms, which means that any empty SystemStreamPartition
+   * buffer will be refreshed at least every 50ms. A higher value here means
+   * that empty SystemStreamPartitions will be refreshed less often, which
+   * means more latency is introduced, but less CPU and network will be used.
+   * Decreasing this value means that empty SystemStreamPartitions are
+   * refreshed more frequently, thereby introducing less latency, but
+   * increasing CPU and network utilization.
+   */
+  val POLL_INTERVAL_MS = "task.poll.interval.ms"
+
   implicit def Config2Task(config: Config) = new TaskConfig(config)
 }
 
@@ -76,4 +93,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
   def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR)
 
   def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
+
+  def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index bff6000..a7142b2 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -306,23 +306,28 @@ object SamzaContainer extends Logging {
 
     info("Got offset manager: %s" format offsetManager)
 
-    val dropDeserializationError: Boolean = config.getDropDeserialization match {
+    val dropDeserializationError = config.getDropDeserialization match {
       case Some(dropError) => dropError.toBoolean
       case _ => false
     }
 
-    val dropSerializationError: Boolean = config.getDropSerialization match {
+    val dropSerializationError = config.getDropSerialization match {
       case Some(dropError) => dropError.toBoolean
       case _ => false
     }
 
+    val pollIntervalMs = config
+      .getPollIntervalMs
+      .getOrElse(SystemConsumers.DEFAULT_POLL_INTERVAL_MS.toString)
+      .toInt
+
     val consumerMultiplexer = new SystemConsumers(
-      // TODO add config values for no new message timeout and max msgs per stream partition
       chooser = chooser,
       consumers = consumers,
       serdeManager = serdeManager,
       metrics = systemConsumersMetrics,
-      dropDeserializationError = dropDeserializationError)
+      dropDeserializationError = dropDeserializationError,
+      pollIntervalMs = pollIntervalMs)
 
     val producerMultiplexer = new SystemProducers(
       producers = producers,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 9eb70f2..fef7227 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -20,13 +20,21 @@
 package org.apache.samza.system
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.Queue
 import org.apache.samza.serializers.SerdeManager
 import grizzled.slf4j.Logging
 import org.apache.samza.system.chooser.MessageChooser
-import org.apache.samza.util.DoublingBackOff
-import org.apache.samza.system.chooser.BufferingMessageChooser
 import org.apache.samza.SamzaException
+import java.util.HashMap
+import java.util.ArrayDeque
+import java.util.Queue
+import java.util.Set
+import java.util.HashSet
+
+object SystemConsumers {
+  val DEFAULT_POLL_INTERVAL_MS = 50
+  val DEFAULT_NO_NEW_MESSAGES_TIMEOUT = 10
+  val DEFAULT_DROP_SERIALIZATION_ERROR = false
+}
 
 /**
  * The SystemConsumers class coordinates between all SystemConsumers, the
@@ -59,98 +67,100 @@ class SystemConsumers(
   metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
 
   /**
-   * The maximum number of messages to poll from a single SystemStreamPartition.
-   */
-  maxMsgsPerStreamPartition: Int = 10000,
-
-  /**
    * If MessageChooser returns null when it's polled, SystemConsumers will
    * poll each SystemConsumer with a timeout next time it tries to poll for
    * messages. Setting the timeout to 0 means that SamzaContainer's main
    * thread will sit in a tight loop polling every SystemConsumer over and
    * over again if no new messages are available.
    */
-  noNewMessagesTimeout: Long = 10,
+  noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
 
   /**
-   * This parameter is to define how to deal with deserialization failure. If set to true,
-   * the task will skip the messages when deserialization fails. If set to false, the task
-   * will throw SamzaException and fail the container.
+   * This parameter is to define how to deal with deserialization failure. If
+   * set to true, the task will skip the messages when deserialization fails.
+   * If set to false, the task will throw SamzaException and fail the container.
    */
-  dropDeserializationError: Boolean = false) extends Logging {
+  dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
 
   /**
-   * The buffer where SystemConsumers stores all incoming message envelopes.
+   * <p>Defines an upper bound for how long the SystemConsumers will wait
+   * before polling systems for more data. The default setting is 50ms, which
+   * means that SystemConsumers will poll for new messages for all
+   * SystemStreamPartitions with empty buffers every 50ms. SystemConsumers
+   * will also poll for new messages any time that there are no available
+   * messages to process, or any time the MessageChooser returns a null
+   * IncomingMessageEnvelope.</p>
+   *
+   * <p>This parameter also implicitly defines how much latency is introduced
+   * by SystemConsumers. If a message is available for a SystemStreamPartition
+   * with no remaining unprocessed messages, the SystemConsumers will poll for
+   * it within 50ms of its availability in the stream system.</p>
    */
-  val buffer = new BufferingMessageChooser(chooser)
+  pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS,
 
   /**
-   * A map of every SystemStreamPartition that SystemConsumers is responsible
-   * for polling. The values are how many messages to poll for during the next
-   * SystemConsumers.poll call.
-   *
-   * If the value for a SystemStreamPartition is maxMsgsPerStreamPartition,
-   * then the implication is that SystemConsumers has no incoming messages in
-   * its buffer for the SystemStreamPartition. If the value is 0 then the
-   * SystemConsumers' buffer is full for the SystemStreamPartition.
+   * Clock can be used to inject a custom clock when mocking this class in
+   * tests. The default implementation returns the current system clock time.
    */
-  var fetchMap = Map[SystemStreamPartition, java.lang.Integer]()
+  clock: () => Long = () => System.currentTimeMillis) extends Logging {
 
   /**
-   * A cache of fetchMap values, grouped according to the system. This is
-   * purely a trick to get better performance out of the SystemConsumsers
-   * class, since the map from systemName to its fetchMap is used for every
-   * poll call.
+   * A buffer of incoming messages grouped by SystemStreamPartition. These
+   * messages are handed out to the MessageChooser as it needs them.
    */
-  var systemFetchMapCache = Map[String, Map[SystemStreamPartition, java.lang.Integer]]()
+  private val unprocessedMessagesBySSP = new HashMap[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
+
+  /**
+   * A set of SystemStreamPartitions grouped by systemName. This is used as a
+   * cache to figure out which SystemStreamPartitions we need to poll from the
+   * underlying system consumer.
+   */
+  private val emptySystemStreamPartitionsBySystem = new HashMap[String, Set[SystemStreamPartition]]()
 
   /**
    * Default timeout to noNewMessagesTimeout. Every time SystemConsumers
-   * receives incoming messages, it sets timout to 0. Every time
+   * receives incoming messages, it sets timeout to 0. Every time
    * SystemConsumers receives no new incoming messages from the MessageChooser,
    * it sets timeout to noNewMessagesTimeout again.
    */
   var timeout = noNewMessagesTimeout
 
   /**
-   * Make the maximum backoff proportional to the number of streams we're consuming.
-   * For a few streams, make the max back off 1, but for hundreds make it up to 1k,
-   * which experimentally has shown to be the most performant.
+   * The last time that systems were polled for new messages.
    */
-  var maxBackOff = 0
+  var lastPollMs = 0L
 
   /**
-   * How low totalUnprocessedMessages has to get before the consumers are
-   * polled for more data. This is defined to be 10% of
-   * maxMsgsPerStreamPartition. Since maxMsgsPerStreamPartition defaults to
-   * 10000, the default refreshThreshold is 1000.
+   * Total number of unprocessed messages in unprocessedMessagesBySSP.
    */
-  val refreshThreshold = maxMsgsPerStreamPartition * .1
+  var totalUnprocessedMessages = 0
 
   debug("Got stream consumers: %s" format consumers)
-  debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
   debug("Got no new message timeout: %s" format noNewMessagesTimeout)
 
-  metrics.setUnprocessedMessages(() => buffer.unprocessedMessages.size)
-  metrics.setNeededByChooser(() => buffer.neededByChooser.size)
   metrics.setTimeout(() => timeout)
-  metrics.setMaxMessagesPerStreamPartition(() => maxMsgsPerStreamPartition)
-  metrics.setNoNewMessagesTimeout(() => noNewMessagesTimeout)
+  metrics.setNeededByChooser(() => emptySystemStreamPartitionsBySystem.size)
+  metrics.setUnprocessedMessages(() => totalUnprocessedMessages)
 
   def start {
     debug("Starting consumers.")
 
-    maxBackOff = scala.math.pow(10, scala.math.log10(fetchMap.size).toInt).toInt
-
-    debug("Got maxBackOff: " + maxBackOff)
+    emptySystemStreamPartitionsBySystem ++= unprocessedMessagesBySSP
+      .keySet
+      .groupBy(_.getSystem)
+      .mapValues(systemStreamPartitions => new HashSet(systemStreamPartitions.toSeq))
 
     consumers
       .keySet
       .foreach(metrics.registerSystem)
 
-    consumers.values.foreach(_.start)
+    consumers
+      .values
+      .foreach(_.start)
+
+    chooser.start
 
-    buffer.start
+    refresh
   }
 
   def stop {
@@ -158,15 +168,14 @@ class SystemConsumers(
 
     consumers.values.foreach(_.stop)
 
-    buffer.stop
+    chooser.stop
   }
 
   def register(systemStreamPartition: SystemStreamPartition, offset: String) {
     debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
-
     metrics.registerSystemStream(systemStreamPartition.getSystemStream)
-    buffer.register(systemStreamPartition, offset)
-    updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition)
+    unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]())
+    chooser.register(systemStreamPartition, offset)
 
     try {
       consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
@@ -175,135 +184,128 @@ class SystemConsumers(
     }
   }
 
-  /**
-   * Needs to be be lazy so that we are sure to get the value of maxBackOff assigned
-   * in start(), rather than its initial value.
-   */
-  lazy val refresh = new DoublingBackOff(maxBackOff) {
-    def call(): Boolean = {
-      debug("Refreshing chooser with new messages.")
-
-      // Poll every system for new messages.
-      consumers.keys.map(poll(_)).contains(true)
-    }
-  }
-
   def choose: IncomingMessageEnvelope = {
-    val envelopeFromChooser = buffer.choose
+    val envelopeFromChooser = chooser.choose
 
     if (envelopeFromChooser == null) {
       debug("Chooser returned null.")
 
       metrics.choseNull.inc
 
-      // Allow blocking if the chooser didn't choose a message.
+      // Sleep for a while so we don't poll in a tight loop.
       timeout = noNewMessagesTimeout
     } else {
-      debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
+      val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition
 
-      metrics.choseObject.inc
+      debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
 
-      // Don't block if we have a message to process.
+      // Ok to give the chooser a new message from this stream.
       timeout = 0
-
+      metrics.choseObject.inc
       metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
+
+      if (!update(systemStreamPartition)) {
+        emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition)
+      }
     }
 
-    // Always refresh if we got nothing from the chooser. Otherwise, just 
-    // refresh when the buffer is low. 
-    if (envelopeFromChooser == null || buffer.totalUnprocessedMessages <= refreshThreshold) {
-      refresh.maybeCall()
+    if (envelopeFromChooser == null || lastPollMs < clock() - pollIntervalMs) {
+      refresh
     }
 
-    updateMessageChooser
     envelopeFromChooser
   }
 
   /**
-   * Poll a system for new messages from SystemStreamPartitions that have
-   * dipped below the depletedQueueSizeThreshold threshold.  Return true if
-   * any envelopes were found, false if none.
+   * Poll all SystemStreamPartitions for which there are currently no new
+   * messages to process.
    */
-  private def poll(systemName: String): Boolean = {
+  private def poll(systemName: String) {
     debug("Polling system consumer: %s" format systemName)
 
     metrics.systemPolls(systemName).inc
 
-    val consumer = consumers(systemName)
-
     debug("Getting fetch map for system: %s" format systemName)
 
-    val systemFetchMap = systemFetchMapCache(systemName)
+    val systemFetchSet = emptySystemStreamPartitionsBySystem.get(systemName)
 
-    debug("Fetching: %s" format systemFetchMap)
+    // Poll when at least one SSP in this system needs more messages.
+    if (systemFetchSet.size > 0) {
+      val consumer = consumers(systemName)
 
-    metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchMap.size)
+      debug("Fetching: %s" format systemFetchSet)
 
-    val incomingEnvelopes = consumer.poll(systemFetchMap, timeout)
+      metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchSet.size)
 
-    debug("Got incoming message envelopes: %s" format incomingEnvelopes)
+      val systemStreamPartitionEnvelopes = consumer.poll(systemFetchSet, timeout)
 
-    metrics.systemMessagesPerPoll(systemName).inc
+      debug("Got incoming message envelopes: %s" format systemStreamPartitionEnvelopes)
 
-    // We have new un-processed envelopes, so update maps accordingly.
-    incomingEnvelopes.foreach(envelope => {
-      val systemStreamPartition = envelope.getSystemStreamPartition
+      metrics.systemMessagesPerPoll(systemName).inc
 
-      val messageEnvelope = try {
-        Some(serdeManager.fromBytes(envelope))
-      } catch {
-        case e: Exception if !dropDeserializationError => throw new SystemConsumersException("can not deserialize the message", e)
-        case ex: Throwable => {
-          debug("Deserialization fails: %s . Drop the error message" format ex)
-          metrics.deserializationError.inc
-          None
-        }
-      }
+      val sspAndEnvelopeIterator = systemStreamPartitionEnvelopes.entrySet.iterator
 
-      if (!messageEnvelope.isEmpty) {
-        buffer.update(messageEnvelope.get)
-      }
+      while (sspAndEnvelopeIterator.hasNext) {
+        val sspAndEnvelope = sspAndEnvelopeIterator.next
+        val systemStreamPartition = sspAndEnvelope.getKey
+        val envelopes = new ArrayDeque(sspAndEnvelope.getValue)
+        val numEnvelopes = envelopes.size
+        totalUnprocessedMessages += numEnvelopes
 
-      debug("Got message for: %s, %s" format (systemStreamPartition, envelope))
+        if (numEnvelopes > 0) {
+          unprocessedMessagesBySSP.put(systemStreamPartition, envelopes)
 
-      updateFetchMap(systemStreamPartition, -1)
+          // Update the chooser if it needs a message for this SSP.
+          if (emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition)) {
+            update(systemStreamPartition)
+          }
+        }
+      }
+    } else {
+      debug("Skipping polling for %s. Already have messages available for all registered SystemStreamPartitions." format (systemName))
+    }
+  }
 
-      debug("Updated fetch map for: %s, %s" format (systemStreamPartition, fetchMap))
-    })
+  private def refresh {
+    debug("Refreshing chooser with new messages.")
 
-    !incomingEnvelopes.isEmpty
+    // Update last poll time so we don't poll too frequently.
+    lastPollMs = clock()
+
+    // Poll every system for new messages.
+    consumers.keys.map(poll(_))
   }
 
   /**
-   * A helper method that updates both fetchMap and systemFetchMapCache
-   * simultaneously. This is a convenience method to make sure that the
-   * systemFetchMapCache stays in sync with fetchMap.
+   * Tries to update the message chooser with an envelope from the supplied
+   * SystemStreamPartition if an envelope is available.
    */
-  private def updateFetchMap(systemStreamPartition: SystemStreamPartition, amount: Int = 1) {
-    val fetchSize = fetchMap.getOrElse(systemStreamPartition, java.lang.Integer.valueOf(0)).intValue + amount
-    val systemName = systemStreamPartition.getSystem
-    var systemFetchMap = systemFetchMapCache.getOrElse(systemName, Map())
+  private def update(systemStreamPartition: SystemStreamPartition) = {
+    var updated = false
+    val q = unprocessedMessagesBySSP.get(systemStreamPartition)
+
+    while (q.size > 0 && !updated) {
+      val rawEnvelope = q.remove
+      val deserializedEnvelope = try {
+        Some(serdeManager.fromBytes(rawEnvelope))
+      } catch {
+        case e: Exception if !dropDeserializationError =>
+          throw new SystemConsumersException("Cannot deserialize an incoming message.", e)
+        case ex: Exception =>
+          debug("Cannot deserialize an incoming message. Dropping the error message.", ex)
+          metrics.deserializationError.inc
+          None
+      }
 
-    if (fetchSize >= refreshThreshold) {
-      systemFetchMap += systemStreamPartition -> fetchSize
-    } else {
-      systemFetchMap -= systemStreamPartition
-    }
+      if (deserializedEnvelope.isDefined) {
+        chooser.update(deserializedEnvelope.get)
+        updated = true
+      }
 
-    fetchMap += systemStreamPartition -> fetchSize
-    systemFetchMapCache += systemName -> systemFetchMap
-  }
+      totalUnprocessedMessages -= 1
+    }
 
-  /**
-   * A helper method that updates MessageChooser. This should be called in
-   * "choose" method after we try to consume a message from MessageChooser.
-   */
-  private def updateMessageChooser {
-    buffer
-      .flush
-      // Let the fetchMap know of any SSPs that were given to the chooser, so 
-      // a new fetch can be triggered if the buffer is low.
-      .foreach(updateFetchMap(_))
+    updated
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index b065ae6..a63349c 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -33,10 +33,6 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
   val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
   val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStream, Counter]()
 
-  def setUnprocessedMessages(getValue: () => Int) {
-    newGauge("unprocessed-messages", getValue)
-  }
-
   def setNeededByChooser(getValue: () => Int) {
     newGauge("ssps-needed-by-chooser", getValue)
   }
@@ -45,12 +41,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
     newGauge("poll-timeout", getValue)
   }
 
-  def setMaxMessagesPerStreamPartition(getValue: () => Int) {
-    newGauge("max-buffered-messages-per-stream-partition", getValue)
-  }
-
-  def setNoNewMessagesTimeout(getValue: () => Long) {
-    newGauge("blocking-poll-timeout", getValue)
+  def setUnprocessedMessages(getValue: () => Int) {
+    newGauge("unprocessed-messages", getValue)
   }
 
   def registerSystem(systemName: String) {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
deleted file mode 100644
index c7ef6ef..0000000
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.chooser
-
-import scala.collection.mutable.Queue
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.IncomingMessageEnvelope
-
-/**
- * This buffer is responsible for storing new unprocessed
- * IncomingMessageEnvelopes, feeding the envelopes to the message chooser,
- * and coordinating with the chooser to pick the next message to be processed.
- */
-class BufferingMessageChooser(chooser: MessageChooser) extends MessageChooser {
-  import ChooserStatus._
-
-  /**
-   * A buffer of incoming messages grouped by SystemStreamPartition.
-   */
-  var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
-
-  /**
-   * A count of all messages sitting in SystemConsumers' unprocessedMessages
-   * buffer, and inside the MessageChooser.
-   */
-  var totalUnprocessedMessages = 0
-
-  /**
-   * A map that contains the current status for each SSP that's been
-   * registered to the coordinator.
-   */
-  var statuses = Map[SystemStreamPartition, ChooserStatus]()
-
-  /**
-   * This is a cache of all SSPs that are currently in the "NeededByChooser"
-   * state. It's simply here to improve performance, since it means we don't
-   * need to iterate over all SSPs in the statuses map in order to determine
-   * which SSPs are currently needed by the chooser.
-   */
-  var neededByChooser = Set[SystemStreamPartition]()
-
-  /**
-   * Start the chooser that this buffer is managing.
-   */
-  def start = chooser.start
-
-  /**
-   * Stop the chooser that this buffer is managing.
-   */
-  def stop = chooser.stop
-
-  /**
-   * Register a new SystemStreamPartition with this buffer, as well as the
-   * underlying chooser.
-   */
-  def register(systemStreamPartition: SystemStreamPartition, offset: String) {
-    unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
-    chooser.register(systemStreamPartition, offset)
-    setStatus(systemStreamPartition, NeededByChooser)
-  }
-
-  /**
-   * Add a new unprocessed IncomingMessageEnvelope to the buffer. The buffer
-   * will hold on to this envelope, and eventually feed it to the underlying chooser.
-   */
-  def update(envelope: IncomingMessageEnvelope) {
-    val systemStreamPartition = envelope.getSystemStreamPartition
-
-    unprocessedMessages(envelope.getSystemStreamPartition).enqueue(envelope)
-    totalUnprocessedMessages += 1
-
-    if (statuses(systemStreamPartition).equals(SkippingChooser)) {
-      // If we were skipping messages from this SystemStreamPartition, update 
-      // neededByChooser since we've got messages for it now.
-      setStatus(systemStreamPartition, NeededByChooser)
-    }
-  }
-
-  /**
-   * Tell the buffer to update the underlying chooser with any SSPs that the
-   * chooser needs.
-   *
-   * @return A set of SystemStreamPartitions that were updated in the chooser
-   *         as a result of this method invocation.
-   */
-  def flush: Set[SystemStreamPartition] = {
-    var updatedSystemStreamPartitions = Set[SystemStreamPartition]()
-
-    neededByChooser.foreach(systemStreamPartition => {
-      if (unprocessedMessages(systemStreamPartition).size > 0) {
-        // If we have messages for a stream that the chooser needs, then update.
-        chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
-        updatedSystemStreamPartitions += systemStreamPartition
-        setStatus(systemStreamPartition, InChooser)
-      } else {
-        // If we find that we have no messages for this SystemStreamPartition, 
-        // rather than continue trying to update the chooser with this 
-        // SystemStreamPartition, add it to the skip set and remove it from 
-        // the neededByChooser set (see below).
-        setStatus(systemStreamPartition, SkippingChooser)
-      }
-    })
-
-    updatedSystemStreamPartitions
-  }
-
-  /**
-   * Choose a message from the underlying chooser, and return it.
-   *
-   * @return The IncomingMessageEnvelope that the chooser has picked, or null
-   *         if the chooser didn't pick anything.
-   */
-  def choose = {
-    val envelope = chooser.choose
-
-    if (envelope != null) {
-      setStatus(envelope.getSystemStreamPartition, NeededByChooser)
-
-      // Chooser picked a message, so we've got one less unprocessed message.
-      totalUnprocessedMessages -= 1
-    }
-
-    envelope
-  }
-
-  /**
-   * Update the status of a SystemStreamPartition.
-   */
-  private def setStatus(systemStreamPartition: SystemStreamPartition, status: ChooserStatus) {
-    statuses += systemStreamPartition -> status
-
-    if (status.equals(NeededByChooser)) {
-      neededByChooser += systemStreamPartition
-    } else {
-      neededByChooser -= systemStreamPartition
-    }
-  }
-}
-
-/**
- * ChooserStatus denotes the current state of a SystemStreamPartition for a
- * MessageChooser. This state is used to improve performance in the buffer.
- * To update a MessageChooser, we first check if an envelope exists for each
- * SSP that the buffer needs. If the buffer contains a lot of empty queues,
- * then the operation of iterating over all needed SSPs, and discovering that
- * their queues are empty is a waste of time, since they'll remain empty until
- * a new envelope is added to the queue, which the buffer knows by virtue of
- * having access to the enqueue method. Instead, we stop checking for an empty
- * SSP (SkippingChooser), until a new envelope is added via the enqueue method.
- */
-object ChooserStatus extends Enumeration {
-  type ChooserStatus = Value
-
-  /**
-   * When an envelope has been updated for the MessageChooser, the
-   * SystemStreamPartition for the envelope should be set to the InChooser
-   * state. The state will remain this way until the MessageChooser returns an
-   * envelope with the same SystemStreamPartition, at which point, the
-   * SystemStreamPartition's state should be transitioned to NeededByChooser
-   * (see below).
-   */
-  val InChooser = Value
-
-  /**
-   * If a SystemStreamPartition is not in the InChooser state, and it's
-   * unclear if the buffer has more messages available for the SSP, the SSP
-   * should be in the NeededByChooser state. This state means that the chooser
-   * should be updated with a new message from the SSP, if one is available.
-   */
-  val NeededByChooser = Value
-
-  /**
-   * When a SystemStreamPartition is in the NeededByChooser state, and we try
-   * to update the message chooser with a new envelope from the buffer for the
-   * SSP, there are two potential outcomes. One is that there is an envelope
-   * in the buffer for the SSP. In this case, the state will be transitioned
-   * to InChooser. The other possibility is that there is no envelope in the
-   * buffer at the time the update is trying to occur. In the latter case, the
-   * SSP is transitioned to the SkippingChooser state. This state means that
-   * the buffer will cease to update the chooser with envelopes from this SSP
-   * until a new envelope for the SSP is added to the buffer again.
-   *
-   * <br/><br/>
-   *
-   * The reason that this state exists is purely to improve performance. If we
-   * simply leave SSPs in the NeededByChooser state, we will try and update the
-   * chooser on every updateChooser call for all SSPs. This is a waste of time
-   * if the buffer contains a large number of empty SSPs.
-   */
-  val SkippingChooser = Value
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
index 5374121..4ecf1f2 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
@@ -26,7 +26,6 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.ReadableMetricsRegistry
-
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsHelper
 
@@ -42,13 +41,6 @@ import org.apache.samza.metrics.MetricsHelper
 class RoundRobinChooser(metrics: RoundRobinChooserMetrics = new RoundRobinChooserMetrics) extends BaseMessageChooser {
 
   /**
-   * SystemStreamPartitions that the chooser has received a message for, but
-   * have not yet returned. Envelopes for these SystemStreamPartitions should
-   * be in the queue.
-   */
-  var inflightSystemStreamPartitions = Set[SystemStreamPartition]()
-
-  /**
    * Queue of potential messages to process. Round robin will always choose
    * the message at the head of the queue. A queue can be used to implement
    * round robin here because we only get one envelope per
@@ -61,29 +53,12 @@ class RoundRobinChooser(metrics: RoundRobinChooserMetrics = new RoundRobinChoose
   }
 
   def update(envelope: IncomingMessageEnvelope) = {
-    if (inflightSystemStreamPartitions.contains(envelope.getSystemStreamPartition)) {
-      throw new SamzaException("Received more than one envelope from the same "
-        + "SystemStreamPartition without returning the last. This is a "
-        + "violation of the contract with SystemConsumers, and breaks this "
-        + "RoundRobin implementation.")
-    }
-
     q.add(envelope)
-    inflightSystemStreamPartitions += envelope.getSystemStreamPartition
   }
 
-  def choose = {
-    val envelope = q.poll
-
-    if (envelope != null) {
-      inflightSystemStreamPartitions -= envelope.getSystemStreamPartition
-    }
-
-    envelope
-  }
+  def choose = q.poll
 }
 
-
 class RoundRobinChooserMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
   def setBufferedMessages(getValue: () => Int) {
     newGauge("buffered-messages", getValue)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala b/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala
deleted file mode 100644
index e1d6d4c..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util
-
-/**
- * Perform the provided action (via the call method) and, if it returns true,
- * perform it again, the next time.  If, however, call returns false, do not
- * perform call the next time, instead wait 2*n calls before actually calling,
- * with n increasing to the maximum specified in the constructor.
-
- * @param maxBackOff Absolute maximum number of calls to call before actually performing call.
- */
-abstract class DoublingBackOff(maxBackOff:Int = 64) {
-  var invocationsBeforeCall = 0
-  var currentBackOff = 0
-
-  /**
-   * Method to invoke and whose return value will determine the next time 
-   * it is called again.
-   */
-  def call():Boolean
-
-  /**
-   * Possibly execute the call method, based on the result of the previous run.
-   */
-  def maybeCall():Unit = {
-   if(invocationsBeforeCall == 0) {
-      if (call()) {
-        // call succeeded so reset backoff
-        currentBackOff = 0
-      } else {
-        // Call failed, so start backing off
-        currentBackOff = scala.math.min(maxBackOff, nextBackOff(currentBackOff))
-        invocationsBeforeCall = currentBackOff
-      }
-    } else {
-      invocationsBeforeCall -= 1
-    }
-
-  }
-  
-  // 2 * 0 == 0, making getting started a wee bit hard, so we need a little help with that first back off
-  private def nextBackOff(i:Int) = if(i == 0) 1 else 2 * i
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 97e65eb..04229a6 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -27,39 +27,153 @@ import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.util.BlockingEnvelopeMap
 import org.apache.samza.serializers._
+import org.apache.samza.system.chooser.MockMessageChooser
 
 class TestSystemConsumers {
+  def testPollIntervalMs {
+    val numEnvelopes = 1000
+    val system = "test-system"
+    val systemStreamPartition0 = new SystemStreamPartition(system, "some-stream", new Partition(0))
+    val systemStreamPartition1 = new SystemStreamPartition(system, "some-stream", new Partition(1))
+    val envelope = new IncomingMessageEnvelope(systemStreamPartition0, "1", "k", "v")
+    val consumer = new CustomPollResponseSystemConsumer(envelope)
+    var now = 0L
+    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now)
+
+    consumers.register(systemStreamPartition0, "0")
+    consumers.register(systemStreamPartition1, "1234")
+    consumers.start
+
+    // Tell the consumer to respond with 1000 messages for SSP0, and no 
+    // messages for SSP1.
+    consumer.setResponseSizes(numEnvelopes)
+
+    // Choose to trigger a refresh with data.
+    assertNull(consumers.choose)
+    // 2: First on start, second on choose.
+    assertEquals(2, consumer.polls)
+    assertEquals(2, consumer.lastPoll.size)
+    assertTrue(consumer.lastPoll.contains(systemStreamPartition0))
+    assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
+    assertEquals(envelope, consumers.choose)
+    assertEquals(envelope, consumers.choose)
+    // We aren't polling because we're getting non-null envelopes.
+    assertEquals(2, consumer.polls)
+
+    // Advance the clock to trigger a new poll even though there are still 
+    // messages.
+    now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
+
+    assertEquals(envelope, consumers.choose)
+
+    // We polled even though there are still 997 messages in the unprocessed 
+    // message buffer.
+    assertEquals(3, consumer.polls)
+    assertEquals(1, consumer.lastPoll.size)
+
+    // Only SSP1 was polled because we still have messages for SSP2.
+    assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
+
+    // Now drain all messages for SSP0. There should be exactly 997 messages, 
+    // since we have chosen 3 already, and we started with 1000.
+    (0 until (numEnvelopes - 3)).foreach { i =>
+      assertEquals(envelope, consumers.choose)
+    }
+
+    // Nothing left. Should trigger a poll here.
+    assertNull(consumers.choose)
+    assertEquals(4, consumer.polls)
+    assertEquals(2, consumer.lastPoll.size)
+
+    // Now we ask for messages from both again.
+    assertTrue(consumer.lastPoll.contains(systemStreamPartition0))
+    assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
+  }
+
+  def testBasicSystemConsumersFunctionality {
+    val system = "test-system"
+    val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
+    val envelope = new IncomingMessageEnvelope(systemStreamPartition, "1", "k", "v")
+    val consumer = new CustomPollResponseSystemConsumer(envelope)
+    var now = 0
+    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now)
+
+    consumers.register(systemStreamPartition, "0")
+    consumers.start
+
+    // Start should trigger a poll to the consumer.
+    assertEquals(1, consumer.polls)
+
+    // Tell the consumer to start returning messages when polled.
+    consumer.setResponseSizes(1)
+
+    // Choose to trigger a refresh with data.
+    assertNull(consumers.choose)
+
+    // Choose should have triggered a second poll, since no messages are available.
+    assertEquals(2, consumer.polls)
+
+    // Choose a few times. This time there is no data.
+    assertEquals(envelope, consumers.choose)
+    assertNull(consumers.choose)
+    assertNull(consumers.choose)
+
+    // Return more than one message this time.
+    consumer.setResponseSizes(2)
+
+    // Choose to trigger a refresh with data.
+    assertNull(consumers.choose)
+
+    // Increase clock interval.
+    now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
+
+    // We get two messages now.
+    assertEquals(envelope, consumers.choose)
+    // Should not poll even though clock interval increases past interval threshold.
+    assertEquals(2, consumer.polls)
+    assertEquals(envelope, consumers.choose)
+    assertNull(consumers.choose)
+  }
+
   @Test
   def testSystemConumersShouldRegisterStartAndStopChooser {
     val system = "test-system"
     val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
-    var started = 0
-    var stopped = 0
-    var registered = Map[SystemStreamPartition, String]()
+    var consumerStarted = 0
+    var consumerStopped = 0
+    var consumerRegistered = Map[SystemStreamPartition, String]()
+    var chooserStarted = 0
+    var chooserStopped = 0
+    var chooserRegistered = Map[SystemStreamPartition, String]()
 
     val consumer = Map(system -> new SystemConsumer {
-      def start {}
-      def stop {}
-      def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
-      def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List()
+      def start = consumerStarted += 1
+      def stop = consumerStopped += 1
+      def register(systemStreamPartition: SystemStreamPartition, offset: String) = consumerRegistered += systemStreamPartition -> offset
+      def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
     })
 
     val consumers = new SystemConsumers(new MessageChooser {
       def update(envelope: IncomingMessageEnvelope) = Unit
       def choose = null
-      def start = started += 1
-      def stop = stopped += 1
-      def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset
+      def start = chooserStarted += 1
+      def stop = chooserStopped += 1
+      def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset
     }, consumer, null)
 
     consumers.register(systemStreamPartition, "0")
     consumers.start
     consumers.stop
 
-    assertEquals(1, started)
-    assertEquals(1, stopped)
-    assertEquals(1, registered.size)
-    assertEquals("0", registered(systemStreamPartition))
+    assertEquals(1, chooserStarted)
+    assertEquals(1, chooserStopped)
+    assertEquals(1, chooserRegistered.size)
+    assertEquals("0", chooserRegistered(systemStreamPartition))
+
+    assertEquals(1, consumerStarted)
+    assertEquals(1, consumerStopped)
+    assertEquals(1, consumerRegistered.size)
+    assertEquals("0", consumerRegistered(systemStreamPartition))
   }
 
   @Test
@@ -76,7 +190,7 @@ class TestSystemConsumers {
       def start {}
       def stop {}
       def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
-      def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List()
+      def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
     })
     val consumers = new SystemConsumers(new MessageChooser {
       def update(envelope: IncomingMessageEnvelope) = Unit
@@ -102,16 +216,16 @@ class TestSystemConsumers {
     val system = "test-system"
     val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
     val msgChooser = new DefaultChooser
-    val consumer = Map(system -> new SimpleConsumer)
+    val consumer = Map(system -> new SerializingConsumer)
     val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]);
     val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes)
 
-    // it should throw exceptions when the deserialization has error
+    // throw exceptions when the deserialization has error
     val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false)
     consumers.register(systemStreamPartition, "0")
-    consumers.start
     consumer(system).putBytesMessage
     consumer(system).putStringMessage
+    consumers.start
 
     var caughtRightException = false
     try {
@@ -142,10 +256,35 @@ class TestSystemConsumers {
   }
 
   /**
-   * a simple consumer that provides two extra methods, one is to put bytes format message
-   * and the other to put string format message
+   * A simple MockSystemConsumer that keeps track of what was polled, and lets
+   * you define how many envelopes to return in the poll response. You can
+   * supply the envelope to use for poll responses through the constructor.
+   */
+  private class CustomPollResponseSystemConsumer(envelope: IncomingMessageEnvelope) extends SystemConsumer {
+    var polls = 0
+    var pollResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
+    var lastPoll: java.util.Set[SystemStreamPartition] = null
+    def start {}
+    def stop {}
+    def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
+    def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = {
+      polls += 1
+      lastPoll = systemStreamPartitions
+      pollResponse
+    }
+    def setResponseSizes(numEnvelopes: Int) {
+      val q = new java.util.ArrayList[IncomingMessageEnvelope]()
+      (0 until numEnvelopes).foreach { i => q.add(envelope) }
+      pollResponse = Map(envelope.getSystemStreamPartition -> q)
+      pollResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
+    }
+  }
+
+  /**
+   * A simple consumer that provides two extra methods: one is to put bytes
+   * format message and the other to put string format message.
    */
-  private class SimpleConsumer extends BlockingEnvelopeMap {
+  private class SerializingConsumer extends BlockingEnvelopeMap {
     val systemStreamPartition = new SystemStreamPartition("test-system", "some-stream", new Partition(1))
     def putBytesMessage {
       put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "0", "test".getBytes()))

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
deleted file mode 100644
index c96c53b..0000000
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.chooser
-
-import org.apache.samza.system.chooser.ChooserStatus._
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-import org.junit.Assert._
-import org.junit.Test
-
-class TestBufferingMessageChooser {
-  @Test
-  def testShouldBufferAndFlush {
-    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
-    val chooser = new MockMessageChooser
-    val buffer = new BufferingMessageChooser(chooser)
-    val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null)
-    buffer.register(ssp1, "1")
-    assertEquals(1, chooser.registers.size)
-    assertEquals("1", chooser.registers.getOrElse(ssp1, "0"))
-    buffer.start
-    assertEquals(1, chooser.starts)
-    assertEquals(null, buffer.choose)
-    buffer.update(envelope1)
-    // Should buffer this update, rather than passing it to the wrapped chooser.
-    assertEquals(null, buffer.choose)
-    buffer.flush
-    assertEquals(envelope1, buffer.choose)
-    assertEquals(null, buffer.choose)
-    buffer.stop
-    assertEquals(1, chooser.stops)
-  }
-
-  @Test
-  def testBufferShouldSkipCheckedSSPs {
-    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
-    val chooser = new MockMessageChooser
-    val buffer = new BufferingMessageChooser(chooser)
-    val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null)
-    buffer.register(ssp1, "1")
-    buffer.start
-    checkChooserStatus(NeededByChooser, ssp1, buffer)
-
-    // Buffer first message. Still needed.
-    buffer.update(envelope1)
-    checkChooserStatus(NeededByChooser, ssp1, buffer)
-    assertEquals(null, buffer.choose)
-    checkChooserStatus(NeededByChooser, ssp1, buffer)
-
-    // Flush first message. Now in chooser.
-    buffer.flush
-    checkChooserStatus(InChooser, ssp1, buffer)
-    assertEquals(envelope1, buffer.choose)
-    checkChooserStatus(NeededByChooser, ssp1, buffer)
-
-    // Now flush with no messages. Should begin skipping chooser since no 
-    // messages are available.
-    assertEquals(null, buffer.choose)
-    checkChooserStatus(NeededByChooser, ssp1, buffer)
-    buffer.flush
-    checkChooserStatus(SkippingChooser, ssp1, buffer)
-
-    // Now check that we can get back to NeededByChooser when a new message 
-    // arrives.
-    buffer.update(envelope1)
-    checkChooserStatus(NeededByChooser, ssp1, buffer)
-    assertEquals(0, chooser.envelopes.size)
-    assertEquals(null, buffer.choose)
-
-    // And check that we can get back into the InChooser state.
-    buffer.flush
-    checkChooserStatus(InChooser, ssp1, buffer)
-    assertEquals(envelope1, buffer.choose)
-    checkChooserStatus(NeededByChooser, ssp1, buffer)
-
-    // Shutdown.
-    buffer.stop
-    assertEquals(1, chooser.stops)
-  }
-
-  private def checkChooserStatus(status: ChooserStatus, systemStreamPartition: SystemStreamPartition, buffer: BufferingMessageChooser) {
-    if (status.equals(NeededByChooser)) {
-      assertEquals(Set(systemStreamPartition), buffer.neededByChooser)
-    } else {
-      assertTrue(!buffer.neededByChooser.contains(systemStreamPartition))
-    }
-
-    assertEquals(status, buffer.statuses.getOrElse(systemStreamPartition, null))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
index b2e04a7..d31c3ce 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
@@ -88,39 +88,42 @@ class TestFileReaderSystemConsumer {
     consumer.start
     Thread.sleep(500)
 
-    val number: Integer = 1000
-    val ssp1Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp1 -> number)
-    val ssp2Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp2 -> number)
-    val ssp3Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp3 -> number)
-    val ssp4Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp4 -> number)
-    val ssp5Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp5 -> number)
-
-    val ssp1Result = consumer.poll(ssp1Number, 1000)
-    val ssp2Result = consumer.poll(ssp2Number, 1000)
-    val ssp3Result = consumer.poll(ssp3Number, 1000)
-    val ssp4Result = consumer.poll(ssp4Number, 1000)
+    val ssp1Result = consumer.poll(Set(ssp1), 1000)
+    val ssp2Result = consumer.poll(Set(ssp2), 1000)
+    val ssp3Result = consumer.poll(Set(ssp3), 1000)
+    val ssp4Result = consumer.poll(Set(ssp4), 1000)
 
     assertEquals(0, ssp1Result.size)
     assertEquals(0, ssp2Result.size)
 
     assertEquals(1, ssp3Result.size)
-    assertEquals("first line ", ssp3Result(0).getMessage)
-    assertEquals("0", ssp3Result(0).getOffset)
+    assertEquals(1, ssp3Result.get(ssp3).size)
+    var envelope = ssp3Result.get(ssp3).remove(0)
+    assertEquals("first line ", envelope.getMessage)
+    assertEquals("0", envelope.getOffset)
 
     assertEquals(1, ssp4Result.size)
-    assertEquals("second line ", ssp4Result(0).getMessage)
-    assertEquals("12", ssp4Result(0).getOffset)
+    assertEquals(1, ssp4Result.get(ssp4).size)
+    envelope = ssp4Result.get(ssp4).remove(0)
+    assertEquals("second line ", envelope.getMessage)
+    assertEquals("12", envelope.getOffset)
 
     appendFile
     Thread.sleep(1000)
 
     // ssp5 should read the new lines
-    val ssp5Result = consumer.poll(ssp5Number, 1000)
-    assertEquals(3, ssp5Result.size)
-    assertEquals("This is a new line", ssp5Result(2).getMessage)
-    assertEquals("50", ssp5Result(2).getOffset)
-    assertEquals("other lines ", ssp5Result(1).getMessage)
-    assertEquals("37", ssp5Result(1).getOffset)
+    val ssp5Result = consumer.poll(Set(ssp5), 1000)
+    assertEquals(1, ssp5Result.size)
+    assertEquals(3, ssp5Result.get(ssp5).size)
+    envelope = ssp5Result.get(ssp5).remove(0)
+    assertEquals("third line ", envelope.getMessage)
+    assertEquals("25", envelope.getOffset)
+    envelope = ssp5Result.get(ssp5).remove(0)
+    assertEquals("other lines ", envelope.getMessage)
+    assertEquals("37", envelope.getOffset)
+    envelope = ssp5Result.get(ssp5).remove(0)
+    assertEquals("This is a new line", envelope.getMessage)
+    assertEquals("50", envelope.getOffset)
 
     consumer.stop
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala
deleted file mode 100644
index eaeb005..0000000
--- a/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util
-
-import org.junit.Test
-import org.junit.Assert._
-
-class TestDoublingBackOff {
-  @Test def zeroBackOffWorks() {
-    var counter = 0
-    val zeroBackOff = new DoublingBackOff(0) {
-      def call(): Boolean = {
-        counter += 1
-        true
-      }
-    }
-    
-    for(i <- 0 to 1000) {
-      assertEquals(i, counter)
-      zeroBackOff.maybeCall()
-    }
-  }
-  
-  @Test def backOffWorks() {
-    val toReturn = List(true, false, true, true)
-    var counter = 0
-    val ebo = new DoublingBackOff() {
-      def call(): Boolean = {
-        counter += 1
-        toReturn(counter - 1)
-      }
-    }
-    
-    ebo.maybeCall() // will get back true
-    assertEquals(1, counter) 
-    ebo.maybeCall() // will get back false
-    assertEquals(2, counter)
-    ebo.maybeCall() // last false means we won't actually call, will hold off for one iteration
-    assertEquals(2, counter)
-    ebo.maybeCall() // and call on this one, which gives back true
-    assertEquals(3, counter)
-    ebo.maybeCall() // so we immediately call again and increment
-    assertEquals(4, counter)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
index 23d122e..1661b43 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -74,7 +74,7 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging {
   /**
    * How many messages to process before shutting down.
    */
-  var maxMessages = 100000
+  var maxMessages = 10000000
 
   /**
    * If defined, incoming messages will be forwarded to this SystemStream. If
@@ -84,7 +84,7 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging {
 
   def init(config: Config, context: TaskContext) {
     logInterval = config.getInt("task.log.interval", 10000)
-    maxMessages = config.getInt("task.max.messages", 100000)
+    maxMessages = config.getInt("task.max.messages", 10000000)
     outputSystemStream = Option(config.get("task.outputs", null)).map(Util.getSystemStreamFromNames(_))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
index 4016768..1f4c247 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
@@ -75,7 +75,7 @@ class TestSamzaContainerPerformance extends Logging{
   val partitionsPerStreamCount = System.getProperty("samza.mock.partitions.per.stream", "4").toInt
   val brokerSleepMs = System.getProperty("samza.mock.broker.sleep.ms", "1").toInt
   var logInterval = System.getProperty("samza.task.log.interval", "10000").toInt
-  var maxMessages = System.getProperty("samza.task.max.messages", "100000").toInt
+  var maxMessages = System.getProperty("samza.task.max.messages", "10000000").toInt
 
   val jobConfig = Map(
     "job.factory.class" -> "org.apache.samza.job.local.LocalJobFactory",