You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/10/05 00:10:04 UTC

[1/2] SAMZA-2; adding pluggable message chooser.

Updated Branches:
  refs/heads/master bb0abb670 -> 7bbde2cc2


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
new file mode 100644
index 0000000..8f5fb66
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.system.IncomingMessageEnvelope
+import scala.collection.immutable.Queue
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.util.Arrays
+
+@RunWith(value = classOf[Parameterized])
+class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStreamPartition, String]) => MessageChooser) {
+  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
+  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
+  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3);
+  val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
+
+  @Test
+  def testChooserShouldIgnoreStreamsThatArentInOffsetMap {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(mock, Map())
+
+    chooser.register(envelope1.getSystemStreamPartition, "foo")
+    chooser.start
+    assertEquals(1, mock.starts)
+    assertEquals("foo", mock.registers(envelope1.getSystemStreamPartition))
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+    chooser.stop
+    assertEquals(1, mock.stops)
+  }
+
+  @Test
+  def testChooserShouldEliminateCaughtUpStreamsOnRegister {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition -> "123"))
+
+    // Even though envelope1's SSP is registered as a bootstrap stream, since 
+    // 123=123, it should be marked as "caught up" and treated like a normal 
+    // stream. This means that non-bootstrap stream envelope should be allowed 
+    // to be chosen.
+    chooser.register(envelope1.getSystemStreamPartition, "123")
+    chooser.register(envelope2.getSystemStreamPartition, "321")
+    chooser.start
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserShouldEliminateCaughtUpStreamsAfterRegister {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition -> "123"))
+
+    // Even though envelope1's SSP is registered as a bootstrap stream, since 
+    // 123=123, it should be marked as "caught up" and treated like a normal 
+    // stream. This means that non-bootstrap stream envelope should be allowed 
+    // to be chosen.
+    chooser.register(envelope1.getSystemStreamPartition, "1")
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.start
+    chooser.update(envelope2)
+    // Choose should not return anything since bootstrapper is blocking 
+    // wrapped.choose until it gets an update from envelope1's SSP.
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    // Now that we have an update from the required SSP, the mock chooser 
+    // should be called, and return.
+    assertEquals(envelope2, chooser.choose)
+    // The chooser still has an envelope from envelope1's SSP, so it should 
+    // return.
+    assertEquals(envelope1, chooser.choose)
+    // No envelope for envelope1's SSP has been given, so it should block.
+    chooser.update(envelope2)
+    assertEquals(null, chooser.choose)
+    // Now we're giving an envelope with the proper last offset (123), so no
+    // envelope1's SSP should be treated no differently than envelope2's.
+    chooser.update(envelope4)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+    // Should not block here since there are no more lagging bootstrap streams.
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserShouldWorkWithTwoBootstrapStreams {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(mock, Map(
+      envelope1.getSystemStreamPartition -> "123",
+      envelope2.getSystemStreamPartition -> "321"))
+
+    chooser.register(envelope1.getSystemStreamPartition, "1")
+    chooser.register(envelope2.getSystemStreamPartition, "1")
+    chooser.register(envelope3.getSystemStreamPartition, "1")
+    chooser.start
+    chooser.update(envelope1)
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope3)
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+
+    // Fully loaded now.
+    assertEquals(envelope1, chooser.choose)
+    // Can't pick again because envelope1's SSP is missing.
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    // Can pick again.
+    assertEquals(envelope3, chooser.choose)
+    // Can still pick since envelope3.SSP isn't being tracked.
+    assertEquals(envelope2, chooser.choose)
+    // Can't pick since envelope2.SSP needs an envelope now.
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+    // Now we get envelope1 again.
+    assertEquals(envelope1, chooser.choose)
+    // Can't pick again.
+    assertEquals(null, chooser.choose)
+    // Now use envelope4, to trigger "all caught up" for envelope1.SSP.
+    chooser.update(envelope4)
+    // Chooser's contents is currently: e2, e4 (System.err.println(mock.getEnvelopes))
+    // Add envelope3, whose SSP isn't being tracked.
+    chooser.update(envelope3)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+    // Chooser's contents is currently: e4, e3, e2 (System.err.println(mock.getEnvelopes))
+    assertEquals(envelope4, chooser.choose)
+    // This should be allowed, even though no message from envelope1.SSP is 
+    // available, since envelope4 triggered "all caught up" because its offset 
+    // matches the offset map for this SSP, and we still have an envelope for 
+    // envelope2.SSP in the queue.
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+    // Fin.
+  }
+}
+
+object TestBootstrappingChooser {
+  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with 
+  // just batch size defined should behave just like plain vanilla batching 
+  // chooser.
+  @Parameters
+  def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStreamPartition, String]) => MessageChooser]] = Arrays.asList(
+    Array((wrapped: MessageChooser, latestMessageOffsets: Map[SystemStreamPartition, String]) => new BootstrappingChooser(wrapped, latestMessageOffsets)),
+    Array((wrapped: MessageChooser, latestMessageOffsets: Map[SystemStreamPartition, String]) => new DefaultChooser(wrapped, bootstrapStreamOffsets = latestMessageOffsets)))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
new file mode 100644
index 0000000..12e12c0
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+
+class TestDefaultChooser {
+  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
+  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
+  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream2", new Partition(0)), null, null, 3);
+  val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
+  val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), null, null, 5);
+  val envelope6 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), "321", null, 6);
+  val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 7);
+
+  @Test
+  def testDefaultChooserWithBatchingPrioritizationAndBootstrapping {
+    val mock0 = new MockMessageChooser
+    val mock1 = new MockMessageChooser
+    val mock2 = new MockMessageChooser
+    val chooser = new DefaultChooser(
+      mock0,
+      Some(2),
+      Map(
+        envelope1.getSystemStreamPartition().getSystemStream -> Int.MaxValue,
+        envelope2.getSystemStreamPartition().getSystemStream -> 1),
+      Map(
+        Int.MaxValue -> mock1,
+        1 -> mock2),
+      Map(
+        envelope1.getSystemStreamPartition() -> "123",
+        envelope5.getSystemStreamPartition() -> "321"))
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.register(envelope3.getSystemStreamPartition, null)
+    chooser.register(envelope5.getSystemStreamPartition, null)
+    chooser.start
+    assertEquals(null, chooser.choose)
+
+    // Load with a non-bootstrap stream, and should still get null.
+    chooser.update(envelope3)
+    assertEquals(null, chooser.choose)
+
+    // Load with a bootstrap stream, should get that envelope.
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+
+    // Should block envelope3 since we have no message from envelope1's bootstrap stream.
+    assertEquals(null, chooser.choose)
+
+    // Load envelope2 from non-bootstrap stream with higher priority than envelope3.
+    chooser.update(envelope2)
+
+    // Should block envelope2 since we have no message from envelope1's bootstrap stream.
+    assertEquals(null, chooser.choose)
+
+    // Test batching by giving chooser envelope1 and envelope5, both from same stream, but envelope1 should be preferred partition.
+    chooser.update(envelope5)
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+
+    // Now, envelope5 is still loaded, and we've reached our batchSize limit, so loading envelope1 should still let envelope5 through.
+    chooser.update(envelope1)
+    assertEquals(envelope5, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Now we're back to just envelope3, envelope2. Let's catch up envelope1's SSP using envelope4's offset.
+    chooser.update(envelope4)
+    assertEquals(envelope4, chooser.choose)
+
+    // Should still block envelopes 1 and 2 because the second partition hasn't caught up yet.
+    assertEquals(null, chooser.choose)
+
+    // Now catch up the second partition.
+    chooser.update(envelope6)
+    assertEquals(envelope6, chooser.choose)
+
+    // Cool, now no streams are being bootstrapped. Envelope2 should be prioritized above envelope3, even though envelope3 was added first.
+    assertEquals(envelope2, chooser.choose)
+
+    // We should still batch, and prefer envelope2's partition over envelope7, even though they're both from the same stream.
+    chooser.update(envelope7)
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+
+    // Now envelope2's partition has passed the batchSize, so we should get 7 next.
+    chooser.update(envelope2)
+    assertEquals(envelope7, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+
+    // Now we should finally get the lowest priority non-bootstrap stream, envelope3.
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+}
+
+class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {
+  def start = Unit
+  def stop = Unit
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
new file mode 100644
index 0000000..01802b9
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.Partition
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.util.Arrays
+
+@RunWith(value = classOf[Parameterized])
+class TestRoundRobinChooser(getChooser: () => MessageChooser) {
+  @Test
+  def testRoundRobinChooser {
+    val chooser = getChooser()
+    val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
+    val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), null, null, 2);
+    val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3);
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.register(envelope2.getSystemStreamPartition, "")
+    chooser.register(envelope3.getSystemStreamPartition, "123")
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+
+    // Test one message.
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Verify simple ordering.
+    chooser.update(envelope1)
+    chooser.update(envelope2)
+    chooser.update(envelope3)
+
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Verify mixed ordering.
+    chooser.update(envelope2)
+    chooser.update(envelope1)
+
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+
+    chooser.update(envelope1)
+    chooser.update(envelope2)
+
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Verify simple ordering with different starting envelope.
+    chooser.update(envelope2)
+    chooser.update(envelope1)
+    chooser.update(envelope3)
+
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+}
+
+object TestRoundRobinChooser {
+  // Test both RoundRobinChooser and DefaultChooser here. DefaultChooser with 
+  // no batching, prioritization, or bootstrapping should default to just a 
+  // plain vanilla round robin chooser.
+  @Parameters
+  def parameters: java.util.Collection[Array[() => MessageChooser]] = Arrays.asList(Array(() => new RoundRobinChooser), Array(() => new DefaultChooser))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
new file mode 100644
index 0000000..4cde630
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.system.IncomingMessageEnvelope
+import scala.collection.immutable.Queue
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.util.Arrays
+import org.apache.samza.system.SystemStream
+
+@RunWith(value = classOf[Parameterized])
+class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser) {
+  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
+  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
+  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3);
+  val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
+
+  @Test
+  def testChooserShouldStartStopAndRegister {
+    val mock0 = new MockMessageChooser
+    val mock1 = new MockMessageChooser
+    val chooser = getChooser(
+      Map(envelope1.getSystemStreamPartition -> 1),
+      Map(1 -> mock1),
+      mock0)
+
+    chooser.register(envelope1.getSystemStreamPartition, "foo")
+    chooser.start
+    assertEquals("foo", mock1.registers(envelope1.getSystemStreamPartition))
+    assertEquals(1, mock0.starts)
+    assertEquals(1, mock1.starts)
+    chooser.stop
+    assertEquals(1, mock0.stops)
+    assertEquals(1, mock1.stops)
+  }
+
+  @Test
+  def testChooserShouldFallBackToDefault {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(),
+      Map(),
+      mock)
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.start
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserShouldFailWithNoDefault {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(envelope1.getSystemStreamPartition.getSystemStream -> 0),
+      Map(0 -> mock),
+      null)
+
+    // The SSP for envelope2 is not defined as a priority stream.
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.start
+    assertEquals(null, chooser.choose)
+
+    try {
+      chooser.update(envelope2)
+      fail("Should have failed due to missing default chooser.")
+    } catch {
+      case e: SamzaException => // Expected.
+    }
+  }
+
+  @Test
+  def testChooserWithSingleStream {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(envelope1.getSystemStreamPartition.getSystemStream -> 0),
+      Map(0 -> mock),
+      new MockMessageChooser)
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    chooser.update(envelope4)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope4)
+    chooser.update(envelope1)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserWithSingleStreamWithTwoPartitions {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(envelope2.getSystemStreamPartition.getSystemStream -> 0),
+      Map(0 -> mock),
+      new MockMessageChooser)
+
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.register(envelope3.getSystemStreamPartition, null)
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+    chooser.update(envelope3)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope3)
+    chooser.update(envelope2)
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserWithTwoStreamsOfEqualPriority {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(
+        envelope1.getSystemStreamPartition.getSystemStream -> 0,
+        envelope2.getSystemStreamPartition.getSystemStream -> 0),
+      Map(0 -> mock),
+      new MockMessageChooser)
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    chooser.update(envelope4)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope4)
+    chooser.update(envelope1)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope2)
+    chooser.update(envelope4)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope1)
+    chooser.update(envelope2)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserWithTwoStreamsOfDifferentPriority {
+    val mock0 = new MockMessageChooser
+    val mock1 = new MockMessageChooser
+    val chooser = getChooser(
+      Map(
+        envelope1.getSystemStreamPartition.getSystemStream -> 1,
+        envelope2.getSystemStreamPartition.getSystemStream -> 0),
+      Map(
+        0 -> mock0,
+        1 -> mock1),
+      new MockMessageChooser)
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    chooser.update(envelope4)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope4)
+    chooser.update(envelope1)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope2)
+    chooser.update(envelope4)
+    // Reversed here because envelope4.SSP=envelope1.SSP which is higher 
+    // priority.
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope1)
+    chooser.update(envelope2)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Just the low priority stream.
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+}
+
+object TestTieredPriorityChooser {
+  // Test both PriorityChooser and DefaultChooser here. DefaultChooser with 
+  // just priorities defined should behave just like plain vanilla priority 
+  // chooser.
+  @Parameters
+  def parameters: java.util.Collection[Array[(Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser]] = Arrays.asList(
+    Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new TieredPriorityChooser(priorities, choosers, default)),
+    Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new DefaultChooser(default, None, priorities, choosers)))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 183c6cc..c5487b9 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -25,19 +25,36 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore
 import kafka.api.TopicMetadata
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemAdmin
+import org.apache.samza.SamzaException
+import kafka.consumer.SimpleConsumer
+import kafka.utils.Utils
+import kafka.client.ClientUtils
+import java.util.Random
+import kafka.api.TopicMetadataRequest
+import kafka.common.TopicAndPartition
+import kafka.api.PartitionOffsetRequestInfo
+import kafka.api.OffsetRequest
+import kafka.api.FetchRequestBuilder
+import org.apache.samza.system.SystemStreamPartition
+import kafka.common.ErrorMapping
+import grizzled.slf4j.Logging
 
 class KafkaSystemAdmin(
   systemName: String,
   // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here.
   brokerListString: String,
-  clientId: String = UUID.randomUUID.toString) extends SystemAdmin {
+  timeout: Int = Int.MaxValue,
+  bufferSize: Int = 1024000,
+  clientId: String = UUID.randomUUID.toString) extends SystemAdmin with Logging {
 
-  def getPartitions(streamName: String): java.util.Set[Partition] = {
-    val getTopicMetadata = (topics: Set[String]) => {
-      new ClientUtilTopicMetadataStore(brokerListString, clientId)
-        .getTopicInfo(topics)
-    }
+  val rand = new Random
+
+  private def getTopicMetadata(topics: Set[String]) = {
+    new ClientUtilTopicMetadataStore(brokerListString, clientId)
+      .getTopicInfo(topics)
+  }
 
+  def getPartitions(streamName: String): java.util.Set[Partition] = {
     val metadata = TopicMetadataCache.getTopicMetadata(
       Set(streamName),
       systemName,
@@ -48,4 +65,93 @@ class KafkaSystemAdmin(
       .map(pm => new Partition(pm.partitionId))
       .toSet[Partition]
   }
+
+  def getLastOffsets(streams: java.util.Set[String]) = {
+    var offsets = Map[SystemStreamPartition, String]()
+    var done = false
+    var consumer: SimpleConsumer = null
+
+    debug("Fetching offsets for: %s" format streams)
+
+    while (!done) {
+      try {
+        val metadata = TopicMetadataCache.getTopicMetadata(
+          streams.toSet,
+          systemName,
+          getTopicMetadata)
+
+        debug("Got metadata for streams: %s" format metadata)
+
+        // Break topic metadata topic/partitions into per-broker map.
+        val brokersToTopicPartitions = metadata
+          .values
+          // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] 
+          .flatMap(topicMetadata => topicMetadata
+            .partitionsMetadata
+            // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
+            .map(partitionMetadata => {
+              ErrorMapping.maybeThrowException(partitionMetadata.errorCode)
+              val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
+              val leader = partitionMetadata
+                .leader
+                .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition))
+              (leader, topicAndPartition)
+            }))
+          // Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]]
+          .groupBy(_._1)
+          // Convert to a Map[Broker, Seq[TopicAndPartition]]
+          .mapValues(_.map(_._2))
+
+        debug("Got topic partition data for brokers: %s" format brokersToTopicPartitions)
+
+        // Get the latest offsets for each topic and partition.
+        for ((broker, topicsAndPartitions) <- brokersToTopicPartitions) {
+          val partitionOffsetInfo = topicsAndPartitions
+            .map(topicAndPartition => (topicAndPartition, PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+            .toMap
+          consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId)
+          val brokerOffsets = consumer
+            .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
+            .partitionErrorAndOffsets
+            .map(partitionAndOffset => {
+              if (partitionAndOffset._2.offsets.head <= 0) {
+                debug("Filtering out empty topic partition: %s" format partitionAndOffset)
+              }
+
+              partitionAndOffset
+            })
+            .filter(_._2.offsets.head > 0)
+            // Kafka returns 1 greater than the offset of the last message in 
+            // the topic, so subtract one to fetch the last message.
+            .mapValues(_.offsets.head - 1)
+
+          debug("Got offsets: %s" format brokerOffsets)
+          debug("Shutting down consumer for %s:%s." format (broker.host, broker.port))
+
+          consumer.close
+
+          for ((topicAndPartition, offset) <- brokerOffsets) {
+            offsets += new SystemStreamPartition(systemName, topicAndPartition.topic, new Partition(topicAndPartition.partition)) -> offset.toString
+          }
+        }
+
+        done = true
+      } catch {
+        case e: InterruptedException =>
+          info("Interrupted while fetching last offsets, so forwarding.")
+          if (consumer != null) {
+            consumer.close
+          }
+          throw e
+        case e: Exception =>
+          // Retry.
+          warn("Unable to fetch last offsets for streams due to: %s, %s. Retrying. Turn on debugging to get a full stack trace." format (e.getMessage, streams))
+          debug(e)
+      }
+    }
+
+    info("Got latest offsets for streams: %s, %s" format (streams, offsets))
+
+    offsets
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index ba08af8..a11a72a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -103,7 +103,15 @@ class KafkaSystemFactory extends SystemFactory {
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
     val brokerListString = Option(producerConfig.brokerList)
       .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+    val timeout = consumerConfig.socketTimeoutMs
+    val bufferSize = consumerConfig.socketReceiveBufferBytes
 
-    new KafkaSystemAdmin(systemName, brokerListString, clientId)
+    new KafkaSystemAdmin(
+      systemName,
+      brokerListString,
+      timeout,
+      bufferSize,
+      clientId)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
index baf4695..cad2231 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
@@ -105,6 +105,7 @@ object TestKafkaCheckpointManager {
     Utils.rm(server1.config.logDirs)
     Utils.rm(server2.config.logDirs)
     Utils.rm(server3.config.logDirs)
+    zookeeper.shutdown
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
new file mode 100644
index 0000000..f0c6f8a
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import org.junit.Assert._
+import org.junit.Test
+import kafka.zk.EmbeddedZookeeper
+import org.apache.samza.checkpoint.Checkpoint
+import org.junit.BeforeClass
+import org.junit.AfterClass
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.I0Itec.zkclient.ZkClient
+import kafka.admin.AdminUtils
+import org.apache.samza.util.TopicMetadataStore
+import kafka.producer.ProducerConfig
+import kafka.utils.TestUtils
+import kafka.common.ErrorMapping
+import kafka.utils.TestZKUtils
+import kafka.server.KafkaServer
+import kafka.producer.Producer
+import kafka.server.KafkaConfig
+import kafka.utils.Utils
+import org.apache.samza.system.SystemStream
+import kafka.utils.ZKStringSerializer
+import scala.collection.JavaConversions._
+import kafka.producer.KeyedMessage
+import kafka.message.MessageAndMetadata
+import scala.collection.mutable.ArrayBuffer
+import kafka.consumer.Consumer
+import kafka.consumer.ConsumerConfig
+import java.util.Properties
+import com.sun.xml.internal.xsom.impl.parser.state.group
+
+object TestKafkaSystemAdmin {
+  val TOPIC = "input"
+  val TOTAL_PARTITIONS = 50
+  val REPLICATION_FACTOR = 2
+
+  val zkConnect: String = TestZKUtils.zookeeperConnect
+  var zkClient: ZkClient = null
+  val zkConnectionTimeout = 6000
+  val zkSessionTimeout = 6000
+  val brokerId1 = 0
+  val brokerId2 = 1
+  val brokerId3 = 2
+  val ports = TestUtils.choosePorts(3)
+  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
+
+  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  val config1 = new KafkaConfig(props1) {
+    override val hostName = "localhost"
+    override val numPartitions = 1
+    override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/"
+  }
+  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  val config2 = new KafkaConfig(props2) {
+    override val hostName = "localhost"
+    override val numPartitions = 1
+    override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/"
+  }
+  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
+  val config3 = new KafkaConfig(props3) {
+    override val hostName = "localhost"
+    override val numPartitions = 1
+    override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/"
+  }
+
+  val config = new java.util.Properties()
+  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
+  config.put("metadata.broker.list", brokers)
+  config.put("producer.type", "sync")
+  config.put("request.required.acks", "-1")
+  config.put("serializer.class", "kafka.serializer.StringEncoder");
+  val producerConfig = new ProducerConfig(config)
+  var producer: Producer[String, String] = null
+  var zookeeper: EmbeddedZookeeper = null
+  var server1: KafkaServer = null
+  var server2: KafkaServer = null
+  var server3: KafkaServer = null
+  var metadataStore: TopicMetadataStore = null
+
+  @BeforeClass
+  def beforeSetupServers {
+    zookeeper = new EmbeddedZookeeper(zkConnect)
+    server1 = TestUtils.createServer(config1)
+    server2 = TestUtils.createServer(config2)
+    server3 = TestUtils.createServer(config3)
+    zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
+    producer = new Producer(producerConfig)
+    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+  }
+
+  def createTopic {
+    AdminUtils.createTopic(
+      zkClient,
+      TOPIC,
+      TOTAL_PARTITIONS,
+      REPLICATION_FACTOR)
+  }
+
+  def validateTopic {
+    var done = false
+    var retries = 0
+    val maxRetries = 100
+
+    while (!done && retries < maxRetries) {
+      try {
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(TOPIC), "kafka", metadataStore.getTopicInfo)
+        val topicMetadata = topicMetadataMap(TOPIC)
+        val errorCode = topicMetadata.errorCode
+
+        ErrorMapping.maybeThrowException(errorCode)
+
+        done = true
+      } catch {
+        case e: Throwable =>
+          System.err.println("Got exception while validating test topics. Waiting and retrying.", e)
+          retries += 1
+          Thread.sleep(500)
+      }
+    }
+
+    if (retries >= maxRetries) {
+      fail("Unable to successfully create topics. Tried to validate %s times." format retries)
+    }
+  }
+
+  def getConsumerConnector = {
+    val props = new Properties
+
+    props.put("zookeeper.connect", zkConnect)
+    props.put("group.id", "test")
+    props.put("auto.offset.reset", "smallest")
+
+    val consumerConfig = new ConsumerConfig(props)
+    Consumer.create(consumerConfig)
+  }
+
+  @AfterClass
+  def afterCleanLogDirs {
+    server1.shutdown
+    server1.awaitShutdown()
+    server2.shutdown
+    server2.awaitShutdown()
+    server3.shutdown
+    server3.awaitShutdown()
+    Utils.rm(server1.config.logDirs)
+    Utils.rm(server2.config.logDirs)
+    Utils.rm(server3.config.logDirs)
+    zkClient.close
+    zookeeper.shutdown
+  }
+}
+
+class TestKafkaSystemAdmin {
+  import TestKafkaSystemAdmin._
+
+  @Test
+  def testShouldGetLastOffsets {
+    val systemName = "test"
+    val systemAdmin = new KafkaSystemAdmin(systemName, brokers)
+
+    // Get a non-existent topic.
+    val initialInputOffsets = systemAdmin.getLastOffsets(Set("foo"))
+    assertEquals(0, initialInputOffsets.size)
+
+    // Create an empty topic with 50 partitions, but with no offsets.
+    createTopic
+    validateTopic
+    val createdInputOffsets = systemAdmin.getLastOffsets(Set(TOPIC))
+    assertEquals(0, createdInputOffsets.size)
+
+    // Add a new message to one of the partitions.
+    producer.send(new KeyedMessage(TOPIC, "key1", "val1"))
+    val oneMessageInputOffsets = systemAdmin.getLastOffsets(Set(TOPIC))
+    val ssp = oneMessageInputOffsets.keySet.head
+    assertEquals(1, oneMessageInputOffsets.size)
+    assertEquals(systemName, ssp.getSystem)
+    assertEquals(TOPIC, ssp.getStream)
+    // key1 gets hash-mod'd to partition 48.
+    assertEquals(48, ssp.getPartition.getPartitionId)
+
+    // Validate that a fetch will return the message.
+    val connector = getConsumerConnector
+    var stream = connector.createMessageStreams(Map(TOPIC -> 1)).get(TOPIC).get.get(0).iterator
+    val message = stream.next
+    val text = new String(message.message, "UTF-8")
+    connector.shutdown
+    // Message's offset should match the expected latest offset.
+    assertEquals(oneMessageInputOffsets(ssp), message.offset.toString)
+    assertEquals("val1", text)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 7d4e20a..9602a52 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.test.integration
 
 import org.apache.samza.task.StreamTask
@@ -175,6 +194,7 @@ object TestStatefulTask {
     Utils.rm(server2.config.logDirs)
     Utils.rm(server3.config.logDirs)
     zkClient.close
+    zookeeper.shutdown
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 6805052..c9f7029 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -41,6 +41,7 @@ import TestSamzaAppMasterTaskManager._
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.SamzaException
 
 object TestSamzaAppMasterTaskManager {
   def getContainer(containerId: ContainerId) = new Container {
@@ -428,4 +429,6 @@ class MockSystemFactory extends SystemFactory {
 
 class MockSinglePartitionManager extends SystemAdmin {
   def getPartitions(streamName: String) = Set(new Partition(0))
+  
+  def getLastOffsets(streams: java.util.Set[String]) = throw new SamzaException("Need to implement this")
 }


[2/2] git commit: SAMZA-2; adding pluggable message chooser.

Posted by cr...@apache.org.
SAMZA-2; adding pluggable message chooser.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7bbde2cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7bbde2cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7bbde2cc

Branch: refs/heads/master
Commit: 7bbde2cc288050231c565d308c00cea4287cab4f
Parents: bb0abb6
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Fri Oct 4 15:09:40 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Fri Oct 4 15:09:40 2013 -0700

----------------------------------------------------------------------
 build.gradle                                    |  10 +
 .../documentation/0.7.0/container/streams.md    |  98 ++++++
 .../org/apache/samza/system/MessageChooser.java |  26 --
 .../org/apache/samza/system/SystemAdmin.java    |   4 +-
 .../system/chooser/BaseMessageChooser.java      |  37 +++
 .../samza/system/chooser/MessageChooser.java    | 127 ++++++++
 .../system/chooser/MessageChooserFactory.java   |  27 ++
 .../samza/util/SinglePartitionSystemAdmin.java  |   9 +-
 .../samza/config/DefaultChooserConfig.scala     |  49 +++
 .../org/apache/samza/config/TaskConfig.scala    |   3 +
 .../apache/samza/container/SamzaContainer.scala |  15 +-
 .../apache/samza/system/DefaultChooser.scala    |  28 --
 .../apache/samza/system/SystemConsumers.scala   |  16 +-
 .../samza/system/chooser/BatchingChooser.scala  | 149 +++++++++
 .../system/chooser/BootstrappingChooser.scala   | 207 +++++++++++++
 .../samza/system/chooser/DefaultChooser.scala   | 309 +++++++++++++++++++
 .../system/chooser/RoundRobinChooser.scala      |  95 ++++++
 .../system/chooser/TieredPriorityChooser.scala  | 170 ++++++++++
 .../samza/system/TestSystemConsumers.scala      |  42 +++
 .../system/chooser/MockMessageChooser.scala     |  50 +++
 .../system/chooser/TestBatchingChooser.scala    |  94 ++++++
 .../chooser/TestBootstrappingChooser.scala      | 176 +++++++++++
 .../system/chooser/TestDefaultChooser.scala     | 124 ++++++++
 .../system/chooser/TestRoundRobinChooser.scala  |  95 ++++++
 .../chooser/TestTieredPriorityChooser.scala     | 248 +++++++++++++++
 .../samza/system/kafka/KafkaSystemAdmin.scala   | 118 ++++++-
 .../samza/system/kafka/KafkaSystemFactory.scala |  10 +-
 .../checkpoint/TestKafkaCheckpointManager.scala |   1 +
 .../system/kafka/TestKafkaSystemAdmin.scala     | 212 +++++++++++++
 .../test/integration/TestStatefulTask.scala     |  20 ++
 .../yarn/TestSamzaAppMasterTaskManager.scala    |   3 +
 31 files changed, 2502 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 077e44b..cb96683 100644
--- a/build.gradle
+++ b/build.gradle
@@ -75,6 +75,11 @@ project(":samza-kafka_$scalaVersion") {
     testCompile files("lib/kafka_$scalaVersion-$kafkaVersion-test.jar")
     // end these can all go away when kafka is in maven
   }
+
+  test {
+    // Bump up the heap so we can start ZooKeeper and Kafka brokers.
+    maxHeapSize = "1024m"
+  }
 }
 
 project(":samza-serializers_$scalaVersion") {
@@ -179,5 +184,10 @@ project(":samza-test_$scalaVersion") {
     testCompile project(":samza-core_$scalaVersion")
     testCompile project(":samza-kafka_$scalaVersion")
   }
+
+  test {
+    // Bump up the heap so we can start ZooKeeper and Kafka brokers.
+    maxHeapSize = "1024m"
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/docs/learn/documentation/0.7.0/container/streams.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/streams.md b/docs/learn/documentation/0.7.0/container/streams.md
index b15c34d..fc65ff7 100644
--- a/docs/learn/documentation/0.7.0/container/streams.md
+++ b/docs/learn/documentation/0.7.0/container/streams.md
@@ -53,4 +53,102 @@ When the TaskRunner starts up, it will use the stream-related properties in your
 
 In the process method in StreamTask, there is a MessageCollector parameter given to use. When the TaskRunner calls process() on one of your StreamTask instances, it provides the collector. After the process() method completes, the TaskRunner takes any output messages that your StreamTask wrote to the collector, serializes the messages, and calls the send() method on the appropriate SystemProducer.
 
+### Message Ordering
+
+If a job is consuming messages from more than one system/stream/partition combination, by default, messages will be processed in a round robin fashion. For example, if a job is reading partitions 1 and 2 of page-view-events from a Kafka system, and there are messages available to be processed from both partitions, your StreamTask will get messages in round robin order (partition 1, partition 2, partition 1, partition 2, etc). If a message is not available for a given partition, it will be skipped, until a message becomes available.
+
+#### MessageChooser
+
+The default round robin behavior can be overridden by implementing a custom MessageChooser. A MessageChooser's job is to answer the question, "Given a set of incoming messages, which one should a Samza container process next?".  To write a custom MessageChooser, take a look at the [Javadocs](http://localhost:4000/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/MessageChooser.html), and then configure your task with the "task.chooser.class" configuration, which should point to your MessageChooserFactory.
+
+Out of the box, Samza ships with a RoundRobinChooser, which is the default. You can use the StreamChooser by adding the following configuration to your job.
+
+```
+task.chooser.class=org.apache.samza.system.YourStreamChooserFactory
+```
+
+#### Prioritizing
+
+There are certain times when messages from a stream should be favored over messages from any other stream. For example, some Samza jobs consume two streams: one stream is fed by a real-time system and the other stream is fed by a batch system. A typical pattern is to have a Samza processor with a statistical model that is ranking a real-time feed of data. Periodically, this model needs to be retrained and updated. The Samza processor can be re-deployed with the new model, but how do you re-process all of the old data that the processor has already seen? This can be accomplished by having a batch system send messages to the Samza processor for any data that needs to be re-processed. In this example, you'd like to favor the real-time system over the batch system, when messages are available for the real-time system. This prevents latency from being introduced into the real-time feed even when the batch system is sending messages by always processing the real-time messages first.
+
+Samza provides a mechanism to prioritize one stream over another by setting this value: systems.&lt;system&gt;.streams.&lt;stream&gt;.samza.priority=2. A config snippet illustrates the settings:
+
+```
+systems.kafka.streams.my-stream.samza.priority=2
+systems.kafka.streams.my-other-stream.samza.priority=1
+```
+
+This declares that my-stream's messages will be processed before my-other-stream's. If my-stream has no messages available at the moment (because more are still being read in, for instance), then my-other-stream's messages will get processed.
+
+Each priority level gets its own MessageChooser. In the example above, one MessageChooser is used for my-stream, and another is used for my-other-stream. The MessageChooser for my-other-stream will only be used when my-stream's MessageChooser doesn't return a message to process. 
+
+It is also valid to define two streams with the same priority. If messages are available from two streams at the same priority level, it's up to the MessageChooser for that priority level to decide which message should be processed first.
+
+It's also valid to only define priorities for some streams. All non-prioritized streams will be treated as the lowest priority, and will share a single MessageChooser. If you had my-third-stream, as a third input stream in the example above, it would be prioritized as the lowest stream, and also get its own MessageChooser.
+
+#### Bootstrapping
+
+Some Samza jobs wish to fully consume a stream from offset 0 all the way through to the last message in the stream before they process messages from any other stream. This is useful for streams that have some key-value data that a Samza job wishes to use when processing messages from another stream. This is 
+
+Consider a case where you want to read a currency-code stream, which has mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by country code. You might want to join these symbols to a stream called transactions which is also partitioned by currency, and has a schema like {"country": "USD", "amount": 1234}. You could then have your StreamTask join the currency symbol to each transaction message, and emit messages like {"amount": "$1234"}.
+
+To bootstrap the currency-code stream, you need to read it from offset 0 all the way to the last message in the stream (what I'm calling head). It is not desirable to read any message from the transactions stream until the currency-code stream has been fully read, or else you might try to join a transaction message to a country code that hasn't yet been read.
+
+Samza supports this style of processing with the systems.&lt;system&gt;.streams.&lt;stream&gt;.samza.bootstrap property.
+
+```
+systems.kafka.streams.currency-code.samza.bootstrap=true
+```
+
+This configuration tells Samza that currency-code's messages should be read from the last checkpointed offset all the way until the stream is caught up to "head", before any other message is processed. If you wish to process all messages in currency-code from offset 0 to head, you can define:
+
+```
+systems.kafka.streams.currency-code.samza.bootstrap=true
+systems.kafka.streams.currency-code.samza.reset.offset=true
+```
+
+This tells Samza to start from beginning of the currency-code stream, and read all the way to head.
+
+The difference between prioritizing a stream and bootstrapping a stream, is a high priority stream will still allow lower priority stream messages to be processed when no messages are available for the high priority stream. In the case of bootstrapping, no streams will be allowed to be processed until all messages in the bootstrap stream have been read up to the last message.
+
+Once a bootstrap stream has been fully consumed ("caught up"), it is treated like a normal stream, and no bootstrapping logic happens.
+
+It is valid to define multiple bootstrap streams.
+
+```
+systems.kafka.streams.currency-code.samza.bootstrap=true
+systems.kafka.streams.other-bootstrap-stream.samza.bootstrap=true
+```
+
+In this case, currency-code and other-bootstrap-stream will both be processed before any other stream is processed. The order of message processing (the bootstrap order) between currency-code and other-bootstrap-stream is up to the MessageChooser. If you want to fully process one bootstrap stream before another, you can use priorities:
+
+```
+systems.kafka.streams.currency-code.samza.bootstrap=true
+systems.kafka.streams.currency-code.samza.priority=2
+systems.kafka.streams.other-bootstrap-stream.samza.bootstrap=true
+systems.kafka.streams.other-bootstrap-stream.samza.priority=1
+```
+
+This defines a specific bootstrap ordering: fully bootstrap currency-code before bootstrapping other-bootstrap-stream.
+
+Lastly, bootstrap and non-bootstrap prioritized streams can be mixed:
+
+```
+systems.kafka.streams.currency-code.samza.bootstrap=true
+systems.kafka.streams.non-bootstrap-stream.samza.priority=2
+systems.kafka.streams.other-non-bootstrap-stream.samza.priority=1
+```
+
+Bootstrap streams are assigned a priority of Int.MaxInt by default, so they will always be prioritized over any other prioritized stream. In this case, currency-code will be fully bootstrapped, and then treated as the highest priority stream (Int.IntMax). The next highest priority stream will be non-bootstrap-stream (priority 2), followed by other-non-bootstrap-stream (priority 1), and then any non-bootstrap/non-prioritized streams.
+
+#### Batching
+
+There are cases where consuming from the same SystemStreamPartition repeatedly leads to better performance. Samza allows for consumer batching to satisfy this use case. For example, if you had two SystemStreamPartitions, SSP1 and SSP2, you might wish to read 100 messages from SSP1 and then one from SSP2, regardless of the MessageChooser that's used. This can be accomplished with:
+
+```
+task.consumer.batch.size=100
+```
+
+With this setting, Samza will always try and read a message from the last SystemStreamPartition that was read. This behavior will continue until no message is available for the SystemStreamPartition, or the batch size has been reached. In either of these cases, Samza will defer to the MessageChooser to determine the next message to process. It will then try and stick to the new message's SystemStreamPartition again.
+
 ## [Checkpointing &raquo;](checkpointing.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java b/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java
deleted file mode 100644
index 306b290..0000000
--- a/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system;
-
-public interface MessageChooser {
-  void update(IncomingMessageEnvelope envelopes);
-
-  IncomingMessageEnvelope choose();
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index c902d41..e8144e9 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -19,10 +19,12 @@
 
 package org.apache.samza.system;
 
+import java.util.Map;
 import java.util.Set;
-
 import org.apache.samza.Partition;
 
 public interface SystemAdmin {
   Set<Partition> getPartitions(String streamName);
+
+  Map<SystemStreamPartition, String> getLastOffsets(Set<String> streams);
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-api/src/main/java/org/apache/samza/system/chooser/BaseMessageChooser.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/chooser/BaseMessageChooser.java b/samza-api/src/main/java/org/apache/samza/system/chooser/BaseMessageChooser.java
new file mode 100644
index 0000000..1c280c9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/chooser/BaseMessageChooser.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * An abstract MessageChooser that implements start/stop/register for choosers
+ * that don't use them.
+ */
+abstract public class BaseMessageChooser implements MessageChooser {
+  public void start() {
+  }
+
+  public void stop() {
+  }
+
+  public void register(SystemStreamPartition systemStreamPartition, String lastReadOffset) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java b/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java
new file mode 100644
index 0000000..647b7f7
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * MessageChooser is an interface for programmatic fine-grain control over
+ * stream consumption.
+ * 
+ * Consider the case of a Samza task is consuming multiple streams where some
+ * streams may be from live systems that have stricter SLA requirements and
+ * must always be prioritized over other streams that may be from batch systems.
+ * MessageChooser allows developers to inject message prioritization logic into
+ * the SamzaContainer.
+ * 
+ * In general, the MessageChooser can be used to prioritize certain systems,
+ * streams or partitions over others. It can also be used to throttle certain
+ * partitions if it chooses not to return messages even though they are
+ * available when choose is invoked. The MessageChooser can also throttle the
+ * entire SamzaContainer by performing a blocking operation, such as
+ * Thread.sleep.
+ * 
+ * The manner in which MessageChooser is used is:
+ * 
+ * <ul>
+ * <li>SystemConsumers buffers messages from all SystemStreamPartitions as they
+ * become available.</li>
+ * <li>If MessageChooser has no messages for a given SystemStreamPartition, and
+ * SystemConsumers has a message in its buffer for the SystemStreamPartition,
+ * the MessageChooser will be updated once with the next message in the buffer.</li>
+ * <li>When SamzaContainer is ready to process another message, it calls
+ * SystemConsumers.choose, which in-turn calls MessageChooser.choose.</li>
+ * </ul>
+ * 
+ * Since the MessageChooser only receives one message at a time per
+ * SystemStreamPartition, it can be used to order messages between different
+ * SystemStreamPartitions, but it can't be used to re-order messages within a
+ * single SystemStreamPartition (a buffered sort). This must be done within a
+ * StreamTask.
+ * 
+ * The contract between the MessageChooser and the SystemConsumers is:
+ * 
+ * <ul>
+ * <li>Update can be called multiple times before choose is called.</li>
+ * <li>A null return from MessageChooser.choose means no envelopes should be
+ * processed at the moment.</li>
+ * <li>A MessageChooser may elect to return null when choose is called, even if
+ * unprocessed messages have been given by the update method.</li>
+ * <li>A MessageChooser will not have any of its in-memory state restored in the
+ * event of a failure.</li>
+ * <li>Blocking operations (such as Thread.sleep) will block all processing in
+ * the entire SamzaContainer.</li>
+ * <li>A MessageChooser should never return the same envelope more than once.</li>
+ * <li>Non-deterministic (e.g. time-based) MessageChoosers are allowed.</li>
+ * <li>A MessageChooser does not need to be thread-safe.</li>
+ * </ul>
+ */
+public interface MessageChooser {
+  /**
+   * Called after all SystemStreamPartitions have been registered. Start is used
+   * to notify the chooser that it will start receiving update and choose calls.
+   */
+  void start();
+
+  /**
+   * Called when the chooser is about to be discarded. No more messages will be
+   * given to the chooser after it is stopped.
+   */
+  void stop();
+
+  /**
+   * Called before start, to let the chooser know that it will be handling
+   * envelopes from the given SystemStreamPartition. Register will only be
+   * called before start.
+   * 
+   * @param systemStreamPartition
+   *          A SystemStreamPartition that envelopes will be coming from.
+   * @param lastReadOffset
+   *          The last offset successfully checkpointed for this
+   *          systemStreamPartition.
+   */
+  void register(SystemStreamPartition systemStreamPartition, String lastReadOffset);
+
+  /**
+   * Notify the chooser that a new envelope is available for a processing.A
+   * MessageChooser will receive, at most, one outstanding envelope per
+   * system/stream/partition combination. For example, if update is called for
+   * partition 7 of kafka.mystream, then update will not be called with an
+   * envelope from partition 7 of kafka.mystream until the previous envelope has
+   * been returned via the choose method. Update will only be invoked after the
+   * chooser has been started.
+   * 
+   * @param envelope
+   *          An unprocessed envelope.
+   */
+  void update(IncomingMessageEnvelope envelope);
+
+  /**
+   * The choose method is invoked when the SamzaContainer is ready to process a
+   * new message. The chooser may elect to return any envelope that it's been
+   * given via the update method, which hasn't yet been returned. Choose will
+   * only be called after the chooser has been started.
+   * 
+   * @return The next envelope to process, or null if the chooser has no
+   *         messages or doesn't want to process any at the moment.
+   */
+  IncomingMessageEnvelope choose();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java b/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java
new file mode 100644
index 0000000..6442db9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public interface MessageChooserFactory {
+  MessageChooser getChooser(Config config, MetricsRegistry registry);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-api/src/main/java/org/apache/samza/util/SinglePartitionSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionSystemAdmin.java
index e4ed30b..75ec26a 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionSystemAdmin.java
@@ -21,10 +21,12 @@ package org.apache.samza.util;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamPartition;
 
 /**
  * A simple helper admin class that defines a single partition (partition 0) for
@@ -43,4 +45,9 @@ public class SinglePartitionSystemAdmin implements SystemAdmin {
   public Set<Partition> getPartitions(String streamName) {
     return Collections.unmodifiableSet(ONE_PARTITION);
   }
+
+  @Override
+  public Map<SystemStreamPartition, String> getLastOffsets(Set<String> streams) {
+    throw new SamzaException("Method unsupported for single partition admin.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
new file mode 100644
index 0000000..5fb92cf
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import org.apache.samza.system.SystemStream
+import TaskConfig._
+
+object DefaultChooserConfig {
+  val BOOTSTRAP = StreamConfig.STREAM_PREFIX + "samza.bootstrap"
+  val PRIORITY = StreamConfig.STREAM_PREFIX + "samza.priority"
+  val BATCH_SIZE = "task.consumer.batch.size"
+
+  implicit def Config2DefaultChooser(config: Config) = new DefaultChooserConfig(config)
+}
+
+class DefaultChooserConfig(config: Config) extends ScalaMapConfig(config) {
+  import DefaultChooserConfig._
+
+  def getChooserBatchSize = getOption(BATCH_SIZE)
+
+  def getBootstrapStreams = config
+    .getInputStreams
+    .map(systemStream => (systemStream, getOrElse(BOOTSTRAP format (systemStream.getSystem, systemStream.getStream), "false").equals("true")))
+    .filter(_._2.equals("true"))
+    .map(_._1)
+
+  def getPriorityStreams = config
+    .getInputStreams
+    .map(systemStream => (systemStream, getOrElse(PRIORITY format (systemStream.getSystem, systemStream.getStream), "-1").toInt))
+    .filter(_._2 >= 0)
+    .toMap
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 0c742d8..9c4370f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -35,6 +35,7 @@ object TaskConfig {
   val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
   val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints
   val TASK_JMX_ENABLED = "task.jmx.enabled" // Start up a JMX server for this task?
+  val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
 
   implicit def Config2Task(config: Config) = new TaskConfig(config)
 }
@@ -72,4 +73,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
   def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY)
 
   def getJmxServerEnabled = getBoolean(TaskConfig.TASK_JMX_ENABLED, true)
+  
+  def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 62bd243..6337a2a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -53,11 +53,14 @@ import org.apache.samza.util.Util
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCollector
-import org.apache.samza.system.DefaultChooser
 import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.chooser.MessageChooser
+import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.SystemProducersMetrics
 import org.apache.samza.system.SystemConsumersMetrics
 import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.system.chooser.DefaultChooser
+import org.apache.samza.system.chooser.RoundRobinChooserFactory
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -121,6 +124,10 @@ object SamzaContainer extends Logging {
       (systemName, Util.getObj[SystemFactory](systemFactoryClassName))
     }).toMap
 
+    val systemAdmins = systemNames
+      .map(systemName => (systemName, systemFactories(systemName).getAdmin(systemName, config)))
+      .toMap
+
     info("Got system factories: %s" format systemFactories.keys)
 
     val consumers = inputSystems
@@ -235,7 +242,11 @@ object SamzaContainer extends Logging {
 
     info("Setting up message chooser.")
 
-    val chooser = new DefaultChooser
+    val chooserFactoryClassName = config.getMessageChooserClass.getOrElse(classOf[RoundRobinChooserFactory].getName)
+
+    val chooserFactory = Util.getObj[MessageChooserFactory](chooserFactoryClassName)
+
+    val chooser = DefaultChooser(systemAdmins, chooserFactory, config, samzaContainerMetrics.registry)
 
     info("Setting up metrics reporters.")
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala
deleted file mode 100644
index 5a72e7a..0000000
--- a/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system
-
-import java.util.ArrayDeque
-
-class DefaultChooser extends MessageChooser {
-  var q = new ArrayDeque[IncomingMessageEnvelope]()
-  def update(envelope: IncomingMessageEnvelope) = q.add(envelope)
-  def choose = q.poll
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index d24671e..5cbffe5 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -21,15 +21,14 @@ package org.apache.samza.system
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.Queue
-
 import org.apache.samza.serializers.SerdeManager
-
 import grizzled.slf4j.Logging
+import org.apache.samza.system.chooser.MessageChooser
 
 class SystemConsumers(
   chooser: MessageChooser,
   consumers: Map[String, SystemConsumer],
-  serdeManager: SerdeManager,
+  serdeManager: SerdeManager = new SerdeManager,
   metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
   maxMsgsPerStreamPartition: Int = 1000,
   noNewMessagesTimeout: Long = 10) extends Logging {
@@ -52,13 +51,21 @@ class SystemConsumers(
   def start {
     debug("Starting consumers.")
 
+    consumers
+      .keySet
+      .foreach(metrics.registerSystem)
+
     consumers.values.foreach(_.start)
+
+    chooser.start
   }
 
   def stop {
     debug("Stopping consumers.")
 
     consumers.values.foreach(_.stop)
+
+    chooser.stop
   }
 
   def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) {
@@ -68,8 +75,7 @@ class SystemConsumers(
     fetchMap += systemStreamPartition -> maxMsgsPerStreamPartition
     unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
     consumers(systemStreamPartition.getSystem).register(systemStreamPartition, lastReadOffset)
-
-    metrics.registerSystem(systemStreamPartition.getSystem)
+    chooser.register(systemStreamPartition, lastReadOffset)
   }
 
   def choose = {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/system/chooser/BatchingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BatchingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BatchingChooser.scala
new file mode 100644
index 0000000..b265454
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BatchingChooser.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsRegistry
+import grizzled.slf4j.Logging
+
+/**
+ * BatchingChooser provides a batching functionality on top of an existing
+ * MessageChooser. This is useful in cases where batching leads to better
+ * performance.
+ *
+ * BatchingChooser wraps an underlying MessageChooser. Initially,
+ * BatchingChooser simply forwards update and choose calls to the wrapped
+ * MessageChooser. The first time that the wrapped MessageChooser returns a
+ * non-null envelope, BatchingChooser holds on to the SystemStreamPartition
+ * from the envelope.
+ *
+ * As future envelopes arrive via BatchingChooser.update, BatchingChooser
+ * looks at the SSP for each envelope. If the SSP is the same as the SSP for
+ * the last envelope that was returned by the wrapped MessageChooser's choose
+ * method, BatchingChooser caches the new envelope rather than forwarding it to
+ * the wrapped chooser. The next time choose is called, BatchingChooser will
+ * return the cached envelope, if it is non-null, instead of calling choose on
+ * the wrapped MessageChooser.
+ *
+ * BatchingChooser keeps doing this until the batch size limit has been reached,
+ * or BatchingChooser.choose is called, and no envelope is available for the
+ * batched SSP. In either of these cases, the batch is reset, and a new SSP is
+ * chosen.
+ *
+ * This class depends on the contract defined in MessageChooser. Specifically,
+ * it only works with one envelope per SystemStreamPartition at a time.
+ */
+class BatchingChooser(
+  wrapped: MessageChooser,
+  batchSize: Int = 100,
+  metrics: BatchingChooserMetrics = new BatchingChooserMetrics) extends MessageChooser with Logging {
+
+  var preferredSystemStreamPartition: SystemStreamPartition = null
+  var preferredEnvelope: IncomingMessageEnvelope = null
+  var batchCount = 0
+
+  def update(envelope: IncomingMessageEnvelope) {
+    // If we get an envelope for the SSP we're batching, hold on to it so we 
+    // can bypass the wrapped chooser, and forcibly return it when choose is 
+    // called.
+    if (envelope.getSystemStreamPartition.equals(preferredSystemStreamPartition)) {
+      debug("Bypassing wrapped.update to cache preferred envelope: %s" format preferredEnvelope)
+
+      preferredEnvelope = envelope
+    } else {
+      trace("No preferred envelope, so updating wrapped chooser.")
+
+      wrapped.update(envelope)
+    }
+  }
+
+  /**
+   * Chooses the envelope for the SSP that's currently being batched. If no
+   * SSP is currently being batched, then this method falls back to calling
+   * MessageChooser.choose on the wrapped MessageChooser. If the wrapped
+   * MessageChooser returns a non-null envelope, then the SSP for this envelope
+   * will become the new batched SSP, and BatchingChooser will choose envelopes
+   * for this SSP as long as they're available.
+   */
+  def choose = {
+    if (preferredEnvelope == null) {
+      val envelope = wrapped.choose
+
+      // Change preferred SSP to the envelope's SSP, so we can keep batching 
+      // on this SSP (as long as an envelope is available).
+      if (envelope != null) {
+        setPreferredSystemStreamPartition(envelope)
+      }
+
+      envelope
+    } else {
+      val envelope = preferredEnvelope
+      preferredEnvelope = null
+      batchCount += 1
+
+      trace("Have preferred envelope: %s" format envelope)
+
+      // If we've hit our batching threshold, reset the batch to give other 
+      // SSPs a chance to get picked by the wrapped chooser.
+      if (batchCount >= batchSize) {
+        resetBatch
+      }
+
+      envelope
+    }
+  }
+
+  private def setPreferredSystemStreamPartition(envelope: IncomingMessageEnvelope) {
+    debug("Setting preferred system stream partition to: %s" format envelope.getSystemStreamPartition)
+
+    // Set batch count to 1 since the envelope we're returning is the 
+    // first one in the batch.
+    batchCount = 1
+    preferredSystemStreamPartition = envelope.getSystemStreamPartition
+  }
+
+  private def resetBatch() {
+    debug("Resetting batch due to max batch size limit of: %s" format batchSize)
+    metrics.batches.inc
+    batchCount = 0
+    preferredSystemStreamPartition = null
+  }
+
+  def start = {
+    debug("Starting batching chooser with batch size of: %s" format batchSize)
+    metrics.setBatchedEnvelopes(() => batchCount)
+    wrapped.start
+  }
+
+  def stop = wrapped.stop
+
+  def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) = wrapped.register(systemStreamPartition, lastReadOffset)
+}
+
+class BatchingChooserMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  val batches = newCounter("batch-resets")
+
+  def setBatchedEnvelopes(getValue: () => Int) {
+    newGauge("batched-envelopes", getValue)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
new file mode 100644
index 0000000..2517e16
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import java.util.concurrent.atomic.AtomicInteger
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.IncomingMessageEnvelope
+import grizzled.slf4j.Logging
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsRegistry
+
+/**
+ * BootstrappingChooser is a composable MessageChooser that only chooses
+ * an envelope when it's received at least one envelope for each SystemStream.
+ * It does this by only allowing wrapped.choose to be called when the wrapped
+ * MessageChooser has been updated with at least one envelope for every
+ * SystemStream defined in the latestMessageOffsets map. Thus, the guarantee
+ * is that the wrapped chooser will have an envelope from each SystemStream
+ * whenever it has to make a choice about which envelope to process next.
+ *
+ * This behavior continues for each SystemStream that has lagging partitions.
+ * As a SystemStream catches up to head, it is no longer marked as lagging,
+ * and the requirement that the wrapped chooser have an envelope from the
+ * SystemStream is dropped. Once all SystemStreams have caught up, this
+ * MessageChooser just becomes a pass-through that always delegates to the
+ * wrapped chooser.
+ *
+ * If a SystemStream falls behind after the initial catch-up, this chooser
+ * makes no effort to catch the SystemStream back up, again.
+ */
+class BootstrappingChooser(
+  /**
+   * The message chooser that BootstrappingChooser delegates to when it's
+   * updating or choosing envelopes.
+   */
+  wrapped: MessageChooser,
+
+  /**
+   * A map from SSP to latest offset for each SSP. If a stream does not need
+   * to be guaranteed available to the underlying wrapped chooser, it should
+   * not be included in this map.
+   */
+  var latestMessageOffsets: Map[SystemStreamPartition, String] = Map(),
+
+  /**
+   * An object that holds all of the metrics related to bootstrapping.
+   */
+  metrics: BootstrappingChooserMetrics = new BootstrappingChooserMetrics) extends MessageChooser with Logging {
+
+  /**
+   * The number of lagging partitions for each SystemStream that's behind.
+   */
+  var systemStreamLagCounts = latestMessageOffsets
+    .keySet
+    .groupBy(_.getSystemStream)
+    .mapValues(partitions => partitions.size)
+
+  /**
+   * The total number of SystemStreams that are lagging.
+   */
+  var systemStreamLagSize = systemStreamLagCounts.size
+
+  /**
+   * The number of lagging partitions that the underlying wrapped chooser has
+   * been updated with, grouped by SystemStream.
+   */
+  var updatedSystemStreams = Map[SystemStream, Int]()
+
+  def start = {
+    debug("Starting bootstrapping chooser with latest message offsets: %s" format latestMessageOffsets)
+    info("Got lagging partition counts for bootstrap streams: %s" format systemStreamLagCounts)
+    metrics.setLaggingSystemStreams(() => systemStreamLagSize)
+    systemStreamLagCounts.keys.foreach { (systemStream: SystemStream) =>
+      metrics.setLagCount(systemStream, () => systemStreamLagCounts.getOrElse(systemStream, 0))
+    }
+    wrapped.start
+  }
+
+  def stop = wrapped.stop
+
+  override def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) {
+    debug("Registering stream partition with last read offset: %, %s" format (systemStreamPartition, lastReadOffset))
+
+    // If the last offset read is the same as the latest offset in the SSP, 
+    // then we're already at head for this SSP, so remove it from the lag list.
+    checkOffset(systemStreamPartition, lastReadOffset)
+
+    wrapped.register(systemStreamPartition, lastReadOffset)
+  }
+
+  def update(envelope: IncomingMessageEnvelope) {
+    wrapped.update(envelope)
+
+    // If this is an SSP that is still lagging, update the count for the stream.
+    if (latestMessageOffsets.contains(envelope.getSystemStreamPartition)) {
+      trace("Bumping available message count for stream partition: %s" format envelope.getSystemStreamPartition)
+
+      val systemStream = envelope.getSystemStreamPartition.getSystemStream
+
+      updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) + 1)
+    }
+  }
+
+  /**
+   * If choose is called, and the parent MessageChoser has received an
+   * envelope from at least one partition in each lagging SystemStream, then
+   * the choose call is forwarded  to the wrapped chooser. Otherwise, the
+   * BootstrappingChooser simply returns null, and waits for more updates.
+   */
+  def choose = {
+    // If no system streams are behind, then go straight to the wrapped chooser.
+    if (systemStreamLagSize == 0) {
+      trace("No streams are lagging, so bypassing bootstrap chooser.")
+
+      wrapped.choose
+    } else if (okToChoose) {
+      trace("Choosing from wrapped chooser, since wrapped choser has an envelope from all bootstrap streams.")
+
+      val envelope = wrapped.choose
+
+      if (envelope != null) {
+        trace("Wrapped chooser chose non-null envelope: %s" format envelope)
+
+        val systemStreamPartition = envelope.getSystemStreamPartition
+        val offset = envelope.getOffset
+
+        // Chosen envelope was from a bootstrap SSP, so decrement the update map.
+        if (latestMessageOffsets.contains(systemStreamPartition)) {
+          val systemStream = systemStreamPartition.getSystemStream
+
+          updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1)
+        }
+
+        checkOffset(systemStreamPartition, offset)
+      }
+
+      envelope
+    } else {
+      trace("Blocking wrapped.chooser since bootstrapping is not done, but not all streams have messages available.")
+      null
+    }
+  }
+
+  private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String) {
+    val latestOffset = latestMessageOffsets.getOrElse(systemStreamPartition, null)
+    val systemStream = systemStreamPartition.getSystemStream
+
+    trace("Check offset: %s, %s" format (systemStreamPartition, offset))
+
+    // The SSP is no longer lagging if the envelope's offset equals the 
+    // lastOffset map. 
+    if (offset != null && offset.equals(latestOffset)) {
+      latestMessageOffsets -= systemStreamPartition
+      systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
+
+      debug("Bootstrap stream partition is fully caught up: %s" format systemStreamPartition)
+
+      if (systemStreamLagCounts(systemStream) == 0) {
+        info("Bootstrap stream is fully caught up: %s" format systemStream)
+
+        // If the lag count is 0, then no partition for this stream is lagging 
+        // (the stream has been fully caught up).
+        systemStreamLagSize -= 1
+        systemStreamLagCounts -= systemStream
+      }
+    }
+  }
+
+  /**
+   * It's only OK to allow the wrapped MessageChooser to choose if it's been
+   * given at least one envelope from each lagging SystemStream.
+   */
+  private def okToChoose = {
+    updatedSystemStreams.values.filter(_ > 0).size == systemStreamLagSize
+  }
+}
+
+class BootstrappingChooserMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  val batches = newCounter("batch-resets")
+
+  def setLaggingSystemStreams(getValue: () => Int) {
+    newGauge("lagging-batch-streams", getValue)
+  }
+
+  def setLagCount(systemStream: SystemStream, getValue: () => Int) {
+    newGauge("%s-%s-lagging-partitions" format (systemStream.getSystem, systemStream.getStream), getValue)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
new file mode 100644
index 0000000..53fee8e
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import scala.collection.JavaConversions._
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.apache.samza.config.SystemConfig._
+import org.apache.samza.config.DefaultChooserConfig._
+import org.apache.samza.config.TaskConfig._
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util.Util
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import grizzled.slf4j.Logging
+
+object DefaultChooser extends Logging {
+  def apply(systemAdmins: Map[String, SystemAdmin], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry) = {
+    val batchSize = config.getChooserBatchSize match {
+      case Some(batchSize) => Some(batchSize.toInt)
+      case _ => None
+    }
+
+    debug("Got batch size: %s" format batchSize)
+
+    // Normal streams default to priority 0.
+    val defaultPrioritizedStreams = config
+      .getInputStreams
+      .map((_, 0))
+      .toMap
+
+    debug("Got default priority streams: %s" format defaultPrioritizedStreams)
+
+    // Bootstrap streams default to Int.MaxValue priority.
+    val prioritizedBootstrapStreams = config
+      .getBootstrapStreams
+      .map((_, Int.MaxValue))
+      .toMap
+
+    debug("Got bootstrap priority streams: %s" format prioritizedBootstrapStreams)
+
+    // Explicitly prioritized streams are set to whatever they were configured to.
+    val prioritizedStreams = config.getPriorityStreams
+
+    debug("Got prioritized streams: %s" format prioritizedStreams)
+
+    // Only wire in what we need.
+    val useBootstrapping = prioritizedBootstrapStreams.size > 0
+    val usePriority = useBootstrapping || prioritizedStreams.size > 0
+
+    // Build a map from SSP -> lastReadOffset for each bootstrap stream.
+    val latestMessageOffsets = prioritizedBootstrapStreams
+      .keySet
+      // Group streams by system (system -> [streams])
+      .groupBy(_.getSystem())
+      // Get the SystemAdmin for each system, and get all lastReadOffsets for 
+      // each stream. Flatten into a simple SSP -> lastReadOffset map.
+      .flatMap {
+        case (systemName, streams) =>
+          systemAdmins
+            .getOrElse(systemName, throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
+            .getLastOffsets(streams.map(_.getStream))
+      }
+
+    debug("Got bootstrap offsets: %s" format latestMessageOffsets)
+
+    val priorities = if (usePriority) {
+      // Ordering is important here. Overrides Int.MaxValue default for 
+      // bootstrap streams with explicitly configured values, in cases where 
+      // users have defined a bootstrap stream's priority in config.
+      defaultPrioritizedStreams ++ prioritizedBootstrapStreams ++ prioritizedStreams
+    } else {
+      Map[SystemStream, Int]()
+    }
+
+    debug("Got fully prioritized stream list: %s" format priorities)
+
+    val prioritizedChoosers = priorities
+      .values
+      .toSet
+      .map((_: Int, chooserFactory.getChooser(config, registry)))
+      .toMap
+
+    new DefaultChooser(
+      chooserFactory.getChooser(config, registry),
+      batchSize,
+      priorities,
+      prioritizedChoosers,
+      latestMessageOffsets,
+      registry)
+  }
+}
+
+/**
+ * DefaultChooser adds additional functionality to an existing MessageChooser.
+ *
+ * The following behaviors are currently supported:
+ *
+ * 1. Batching.
+ * 2. Prioritized streams.
+ * 3. Bootstrapping.
+ *
+ * By default, this chooser will not do any of this. It will simply default to
+ * a RoundRobinChooser.
+ *
+ * To activate batching, you must define:
+ *
+ *   task.consumer.batch.size
+ *
+ * To define a priority for a stream, you must define:
+ *
+ *   systems.<system>.streams.<stream>.samza.priority
+ *
+ * To declare a bootstrap stream, you must define:
+ *
+ *   systems.<system>.streams.<stream>.samza.bootstrap
+ *
+ * When batching is activated, the DefaultChooserFactory will allow the
+ * initial strategy to be executed once (by default, this is RoundRobin). It
+ * will then keep picking the SystemStreamPartition that the RoundRobin
+ * chooser selected, up to the batch size, provided that there are messages
+ * available for this SystemStreamPartition. If the batch size is reached, or
+ * there are no messages available, the RoundRobinChooser will be executed
+ * again, and the batching process will repeat itself.
+ *
+ * When a stream is defined with a priority, it is preferred over all lower
+ * priority streams in cases where there are messages available from both
+ * streams. If two envelopes exist for two SystemStreamPartitions that have
+ * the same priority, the default strategy is used to determine which envelope
+ * to use (RoundRobinChooser, by default). If a stream doesn't have a
+ * configured priority, its priority is 0. Higher priority streams are
+ * preferred over lower priority streams.
+ *
+ * When a stream is defined as a bootstrap stream, it is prioritized with a
+ * default priority of Int.MaxValue. This priority can be overridden using the
+ * same priority configuration defined above (task.chooser.priorites.*). The
+ * DefaultChooserFactory guarantees that the wrapped MessageChooser will have
+ * at least one envelope from each bootstrap stream whenever the wrapped
+ * MessageChooser must make a decision about which envelope to process next.
+ * If a stream is defined as a bootstrap stream, and is prioritized higher
+ * than all other streams, it means that all messages in the stream will be
+ * processed (up to head) before any other messages are processed. Once all of
+ * a bootstrap stream's partitions catch up to head, the stream is marked as
+ * fully bootstrapped, and it is then treated like a normal prioritized stream.
+ *
+ * Valid configurations include:
+ *
+ *   task.consumer.batch.size=100
+ *
+ * This configuration will just batch up to 100 messages from each
+ * SystemStreamPartition. It will use a RoundRobinChooser whenever it needs to
+ * find the next SystemStreamPartition to batch.
+ *
+ *   systems.kafka.streams.mystream.samza.priority=1
+ *
+ * This configuration will prefer messages from kafka.mystream over any other
+ * input streams (since other input streams will default to priority 0).
+ *
+ *   systems.kafka.streams.profiles.samza.bootstrap=true
+ *
+ * This configuration will process all messages from kafka.profiles up to the
+ * current head of the profiles stream before any other messages are processed.
+ * From then on, the profiles stream will be preferred over any other stream in
+ * cases where incoming envelopes are ready to be processed from it.
+ *
+ *   task.consumer.batch.size=100
+ *   systems.kafka.streams.profiles.samza.bootstrap=true
+ *   systems.kafka.streams.mystream.samza.priority=1
+ *
+ * This configuration will read all messages from kafka.profiles from the last
+ * checkpointed offset, up to head. It will then prefer messages from profiles
+ * over mystream, and prefer messages from mystream over any other stream. In
+ * cases where there is more than one envelope available with the same priority
+ * (e.g. two envelopes from different partitions in the profiles stream),
+ * RoundRobinChooser will be used to break the tie. Once the tie is broken, up
+ * to 100 messages will be read from the envelope's SystemStreamPartition,
+ * before RoundRobinChooser is consulted again to break the next tie.
+ *
+ *   systems.kafka.streams.profiles.samza.bootstrap=true
+ *   systems.kafka.streams.profiles.samza.reset.offset=true
+ *
+ * This configuration will bootstrap the profiles stream the same way as the
+ * last example, except that it will always start from offset zero, which means
+ * that it will always read all messages in the topic from oldest to newest.
+ */
+class DefaultChooser(
+  /**
+   * The wrapped chooser serves two purposes. In cases where bootstrapping or
+   * prioritization is enabled, wrapped chooser serves as the default for
+   * envelopes that have no priority defined.
+   *
+   * When prioritization and bootstrapping are not enabled, but batching is,
+   * wrapped chooser is used as the strategy to determine which
+   * SystemStreamPartition to batch next.
+   *
+   * When nothing is enabled, DefaultChooser just acts as a pass through for
+   * the wrapped chooser.
+   */
+  DefaultChooser: MessageChooser = new RoundRobinChooser,
+
+  /**
+   * If defined, enables batching, and defines a max message size for a given
+   * batch. Once the batch size is exceeded (at least batchSize messages have
+   * been processed from a single system stream), the wrapped chooser is used
+   * to determine the next system stream to process.
+   */
+  batchSize: Option[Int] = None,
+
+  /**
+   * Defines a mapping from SystemStream to a priority tier. Envelopes from
+   * higher priority SystemStreams are processed before envelopes from lower
+   * priority SystemStreams.
+   *
+   * If multiple envelopes exist within a single tier, the prioritized chooser
+   * (defined below) is used to break the tie.
+   *
+   * If this map is empty, prioritization will not happen.
+   */
+  prioritizedStreams: Map[SystemStream, Int] = Map(),
+
+  /**
+   * Defines the tie breaking strategy to be used at each tier of the priority
+   * chooser. This chooser is used to break the tie when more than one envelope
+   * exists with the same priority.
+   */
+  prioritizedChoosers: Map[Int, MessageChooser] = Map(),
+
+  /**
+   * Defines a mapping from SystemStreamPartition to the offset of the last
+   * message in each SSP. Bootstrap streams are marked as "behind" until all
+   * SSPs for the SystemStream have been read. Once the bootstrap stream has
+   * been "caught up" it is removed from the bootstrap set, and treated as a
+   * normal stream.
+   *
+   * If this map is empty, no streams will be treated as bootstrap streams.
+   *
+   * Using bootstrap streams automatically enables stream prioritization.
+   * Bootstrap streams default to a priority of Int.MaxValue.
+   */
+  bootstrapStreamOffsets: Map[SystemStreamPartition, String] = Map(),
+
+  /**
+   * Metrics registry to be used when wiring up wrapped choosers.
+   */
+  registry: MetricsRegistry = new MetricsRegistryMap) extends MessageChooser with Logging {
+
+  val chooser = {
+    val useBatching = batchSize.isDefined
+    val useBootstrapping = bootstrapStreamOffsets.size > 0
+    val usePriority = useBootstrapping || prioritizedStreams.size > 0
+
+    info("Building default chooser with: useBatching=%s, useBootstrapping=%s, usePriority=%s" format (useBatching, useBootstrapping, usePriority))
+
+    val maybePrioritized = if (usePriority) {
+      new TieredPriorityChooser(prioritizedStreams, prioritizedChoosers, DefaultChooser)
+    } else if (DefaultChooser == null) {
+      // Null wrapped chooser without a priority chooser is not allowed 
+      // because DefaultChooser needs an underlying message chooser.
+      throw new SamzaException("A null chooser was given to the DefaultChooser. This is not allowed unless you are using prioritized/bootstrap streams, which you're not.")
+    } else {
+      DefaultChooser
+    }
+
+    val maybeBatching = if (useBatching) {
+      new BatchingChooser(maybePrioritized, batchSize.get, new BatchingChooserMetrics(registry))
+    } else {
+      maybePrioritized
+    }
+
+    if (useBootstrapping) {
+      new BootstrappingChooser(maybeBatching, bootstrapStreamOffsets, new BootstrappingChooserMetrics(registry))
+    } else {
+      maybeBatching
+    }
+  }
+
+  def update(envelope: IncomingMessageEnvelope) {
+    chooser.update(envelope)
+  }
+
+  def choose = chooser.choose
+
+  def start = chooser.start
+
+  def stop = chooser.stop
+
+  def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) = chooser.register(systemStreamPartition, lastReadOffset)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
new file mode 100644
index 0000000..5374121
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import java.util.ArrayDeque
+import org.apache.samza.config.Config
+import org.apache.samza.SamzaException
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.ReadableMetricsRegistry
+
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsHelper
+
+/**
+ * A chooser that round robins between all system stream partitions. This
+ * chooser makes the assumption that it will only ever receive one envelope
+ * at a time, per SystemStreamPartition. This is part of the contract between
+ * MessageChooser and SystemConsumers. If a second envelope from the a
+ * SystemStreamPartition is given to the RoundRobinChooser prior to
+ * RoundRobinChooser.choose returning the prior one, a SamzaException will be
+ * thrown.
+ */
+class RoundRobinChooser(metrics: RoundRobinChooserMetrics = new RoundRobinChooserMetrics) extends BaseMessageChooser {
+
+  /**
+   * SystemStreamPartitions that the chooser has received a message for, but
+   * have not yet returned. Envelopes for these SystemStreamPartitions should
+   * be in the queue.
+   */
+  var inflightSystemStreamPartitions = Set[SystemStreamPartition]()
+
+  /**
+   * Queue of potential messages to process. Round robin will always choose
+   * the message at the head of the queue. A queue can be used to implement
+   * round robin here because we only get one envelope per
+   * SystemStreamPartition at a time.
+   */
+  var q = new ArrayDeque[IncomingMessageEnvelope]()
+
+  override def start {
+    metrics.setBufferedMessages(() => q.size)
+  }
+
+  def update(envelope: IncomingMessageEnvelope) = {
+    if (inflightSystemStreamPartitions.contains(envelope.getSystemStreamPartition)) {
+      throw new SamzaException("Received more than one envelope from the same "
+        + "SystemStreamPartition without returning the last. This is a "
+        + "violation of the contract with SystemConsumers, and breaks this "
+        + "RoundRobin implementation.")
+    }
+
+    q.add(envelope)
+    inflightSystemStreamPartitions += envelope.getSystemStreamPartition
+  }
+
+  def choose = {
+    val envelope = q.poll
+
+    if (envelope != null) {
+      inflightSystemStreamPartitions -= envelope.getSystemStreamPartition
+    }
+
+    envelope
+  }
+}
+
+
+class RoundRobinChooserMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  def setBufferedMessages(getValue: () => Int) {
+    newGauge("buffered-messages", getValue)
+  }
+}
+
+class RoundRobinChooserFactory extends MessageChooserFactory {
+  def getChooser(config: Config, registry: MetricsRegistry) = new RoundRobinChooser(new RoundRobinChooserMetrics(registry))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala
new file mode 100644
index 0000000..94b2b66
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import scala.collection.immutable.TreeMap
+import org.apache.samza.SamzaException
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import grizzled.slf4j.Logging
+
+/**
+ * TieredPriorityChooser groups messages into priority tiers. Each priority
+ * tier has its own MessageChooser. When an envelope arrives, its priority is
+ * determined based on its SystemStream partition, and the MessageChooser for
+ * the envelope's priority tier is updated.
+ *
+ * When choose is called, the TieredPriorityChooser calls choose on each
+ * MessageChooser from the highest priority tier down to the lowest.
+ * TieredPriorityChooser stops calling choose the first time that it gets a
+ * non-null envelope from a MessageChooser.
+ *
+ * A higher number means a higher priority.
+ *
+ * For example, suppose that there are two SystemStreams, X and Y. X has a
+ * priority of 1, and Y has a priority of 0. In this case, there are two tiers:
+ * 1, and 0. Each tier has its own MessageChooser. When an envelope is received
+ * via TieredPriorityChooser.update(), TieredPriorityChooser will determine the
+ * priority of the stream (1 if X, 0 if Y), and update that tier's
+ * MessageChooser. When MessageChooser.choose is called, TieredPriorityChooser
+ * will first call choose on tier 1's MessageChooser. If this MessageChooser
+ * returns null, then TieredPriorityChooser will call choose on tier 0's
+ * MessageChooser. If neither return an envelope, then null is returned.
+ *
+ * This class is useful in cases where you wish to prioritize streams, and
+ * always pick one over another. In such a case, you need a tie-breaker if
+ * multiple envelopes exist that are of the same priority. This class uses
+ * a tier's MessageChooser as the tie breaker when more than one envelope
+ * exists with the same priority.
+ */
+class TieredPriorityChooser(
+  /**
+   * Map from stream to priority tier.
+   */
+  priorities: Map[SystemStream, Int],
+
+  /**
+   * Map from priority tier to chooser.
+   */
+  choosers: Map[Int, MessageChooser],
+
+  /**
+   * Default chooser to use if no priority is defined for an incoming
+   * envelope's SystemStream. If null, an exception is thrown when an unknown
+   * SystemStream is seen.
+   */
+  default: MessageChooser = null) extends MessageChooser with Logging {
+
+  // Do a sanity check.
+  priorities.values.toSet.foreach((priority: Int) =>
+    if (!choosers.contains(priority)) {
+      throw new SamzaException("Missing message chooser for priority: %s" format priority)
+    })
+
+  /**
+   * A sorted list of MessageChoosers. Sorting is according to their priority,
+   * from high to low.
+   */
+  val prioritizedChoosers = choosers
+    .keys
+    .toList
+    .sort(_ > _)
+    .map(choosers(_))
+
+  /**
+   * A map from a SystemStream to the MessageChooser that should be used for
+   * the SystemStream.
+   */
+  val prioritizedStreams = priorities
+    .map(systemStreamPriority => (systemStreamPriority._1, choosers.getOrElse(systemStreamPriority._2, throw new SamzaException("Unable to setup priority chooser. No chooser found for priority: %s" format systemStreamPriority._2))))
+    .toMap
+
+  def update(envelope: IncomingMessageEnvelope) {
+    val systemStream = envelope.getSystemStreamPartition.getSystemStream
+    val chooser = prioritizedStreams.get(systemStream) match {
+      case Some(chooser) =>
+        trace("Got prioritized chooser for stream: %s" format systemStream)
+
+        chooser
+      case _ =>
+        trace("Trying default chooser because no priority is defined stream: %s" format systemStream)
+
+        if (default != null) {
+          default
+        } else {
+          throw new SamzaException("No default chooser defined, and no priority assigned to stream. Can't prioritize: %s" format envelope.getSystemStreamPartition)
+        }
+    }
+
+    chooser.update(envelope)
+  }
+
+  /**
+   * Choose a message from the highest priority MessageChooser. Keep going to
+   * lower priority MessageChoosers until a non-null envelope is returned, or
+   * the end of the list is reached. Return null if no envelopes are returned
+   * from any MessageChoosers.
+   */
+  def choose = {
+    var envelope: IncomingMessageEnvelope = null
+    val iter = prioritizedChoosers.iterator
+
+    while (iter.hasNext && envelope == null) {
+      envelope = iter.next.choose
+    }
+
+    if (envelope == null && default != null) {
+      trace("Got no prioritized envelope, so checking default chooser.")
+      default.choose
+    } else {
+      trace("Got prioritized envelope: %s" format envelope)
+      envelope
+    }
+  }
+
+  def start = {
+    info("Starting priority chooser with priorities: %s" format priorities)
+
+    if (default != null) {
+      info("Priority chooser has a default chooser: %s" format default)
+
+      default.start
+    }
+
+    choosers.values.foreach(_.start)
+  }
+
+  def stop = {
+    if (default != null) {
+      default.stop
+    }
+
+    choosers.values.foreach(_.stop)
+  }
+
+  def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) = {
+    if (default != null) {
+      default.register(systemStreamPartition, lastReadOffset)
+    }
+
+    choosers.values.foreach(_.register(systemStreamPartition, lastReadOffset))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
new file mode 100644
index 0000000..e941125
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -0,0 +1,42 @@
+package org.apache.samza.system
+
+import scala.collection.JavaConversions._
+import org.apache.samza.Partition
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.system.chooser.MessageChooser
+
+class TestSystemConsumers {
+  @Test
+  def testSystemConumersShouldRegisterStartAndStopChooser {
+    val system = "test-system"
+    val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
+    var started = 0
+    var stopped = 0
+    var registered = Map[SystemStreamPartition, String]()
+
+    val consumer = Map(system -> new SystemConsumer {
+      def start {}
+      def stop {}
+      def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) {}
+      def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List()
+    })
+
+    val consumers = new SystemConsumers(new MessageChooser {
+      def update(envelope: IncomingMessageEnvelope) = Unit
+      def choose = null
+      def start = started += 1
+      def stop = stopped += 1
+      def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) = registered += systemStreamPartition -> lastReadOffset
+    }, consumer, null)
+
+    consumers.register(systemStreamPartition, "0")
+    consumers.start
+    consumers.stop
+
+    assertEquals(1, started)
+    assertEquals(1, stopped)
+    assertEquals(1, registered.size)
+    assertEquals("0", registered(systemStreamPartition))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/MockMessageChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/MockMessageChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/MockMessageChooser.scala
new file mode 100644
index 0000000..d8a2f78
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/MockMessageChooser.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+
+class MockMessageChooser extends MessageChooser {
+  var envelopes = scala.collection.mutable.Queue[IncomingMessageEnvelope]()
+  var starts = 0
+  var stops = 0
+  var registers = Map[SystemStreamPartition, String]()
+
+  def start = starts += 1
+
+  def stop = stops += 1
+
+  def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) = registers += systemStreamPartition -> lastReadOffset
+
+  def update(envelope: IncomingMessageEnvelope) {
+    envelopes += envelope
+  }
+
+  def choose = {
+    try {
+      envelopes.dequeue
+    } catch {
+      case e: NoSuchElementException => null
+    }
+  }
+
+  def getEnvelopes = envelopes
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
new file mode 100644
index 0000000..d7632b4
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.system.IncomingMessageEnvelope
+import scala.collection.immutable.Queue
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.util.Arrays
+
+@RunWith(value = classOf[Parameterized])
+class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
+  @Test
+  def testChooserShouldHandleBothBatchSizeOverrunAndNoEnvelopeAvailable {
+    val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
+    val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
+    val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 3);
+    val mock = new MockMessageChooser
+    val chooser = getChooser(mock, 2)
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.register(envelope2.getSystemStreamPartition, "")
+    chooser.start
+    // Make sure start and register are working.
+    assertEquals(1, mock.starts)
+    assertEquals(null, mock.registers(envelope1.getSystemStreamPartition))
+    assertEquals("", mock.registers(envelope2.getSystemStreamPartition))
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    assertEquals(envelope1, mock.getEnvelopes.head)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(None, mock.getEnvelopes.headOption)
+    chooser.update(envelope2)
+    assertEquals(envelope2, mock.getEnvelopes.head)
+    // This envelope should be batched, and therefore cached in the BatchingChooser.
+    chooser.update(envelope3)
+    assertEquals(1, mock.getEnvelopes.size)
+    assertEquals(envelope2, mock.getEnvelopes.head)
+    // Preferred envelope3 over envelope2, since envelope3 was in the same SSP.
+    assertEquals(envelope3, chooser.choose)
+    // Since batch size is 2, we should have reset batch size, and should fall back to the mock now.
+    chooser.update(envelope1)
+    assertEquals(2, mock.getEnvelopes.size)
+    assertEquals(envelope2, mock.getEnvelopes.head)
+    assertEquals(envelope1, mock.getEnvelopes.last)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(1, mock.getEnvelopes.size)
+    assertEquals(envelope1, mock.getEnvelopes.head)
+    // Now envelope 2's SSP (kafka.stream1, partition 1) is preferred, but no new envelopes for this partition have been loaded.
+    // Let's trigger a reset back to envelope2's SSP (kafka.stream, partition 0).
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(0, mock.getEnvelopes.size)
+    // Now verify that SSP (kafka.stream1, partition 1) is preferred again.
+    chooser.update(envelope2)
+    chooser.update(envelope1)
+    assertEquals(1, mock.getEnvelopes.size)
+    assertEquals(envelope2, mock.getEnvelopes.head)
+    assertEquals(envelope1, chooser.choose)
+    chooser.stop
+    assertEquals(1, mock.stops)
+  }
+}
+
+object TestBatchingChooser {
+  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with 
+  // just batch size defined should behave just like plain vanilla batching 
+  // chooser.
+  @Parameters
+  def parameters: java.util.Collection[Array[(MessageChooser, Int) => MessageChooser]] = Arrays.asList(
+    Array((wrapped: MessageChooser, batchSize: Int) => new BatchingChooser(wrapped, batchSize)),
+    Array((wrapped: MessageChooser, batchSize: Int) => new DefaultChooser(wrapped, Some(batchSize))))
+}
\ No newline at end of file