You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/07/29 00:18:22 UTC

[2/4] SAMZA-123: Move topic partition grouping to the AM and generalize

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
new file mode 100644
index 0000000..f8a535a
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job
+
+import org.junit.Test
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.apache.samza.util.Util._
+import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.junit.Assert._
+
+class TestShellCommandBuilder {
+
+  @Test
+  def testJsonCreateStreamPartitionStringRoundTrip() {
+    val getPartitions: Set[SystemStreamPartition] = {
+      // Build a heavily skewed set of partitions.
+      def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
+      val system = "all-same-system."
+      val lotsOfParts = Map(system + "topic-with-many-parts-a" -> partitionSet(128),
+        system + "topic-with-many-parts-b" -> partitionSet(128), system + "topic-with-many-parts-c" -> partitionSet(64))
+      val fewParts = ('c' to 'z').map(l => system + l.toString -> partitionSet(4)).toMap
+      val streamsMap = (lotsOfParts ++ fewParts)
+      (for(s <- streamsMap.keys;
+           part <- streamsMap.getOrElse(s, Set.empty)) yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
+    }
+
+    // Group by partition...
+    val sspTaskNameMap = TaskNamesToSystemStreamPartitions(getPartitions.groupBy(p => new TaskName(p.getPartition.toString)).toMap)
+
+    val asString = ShellCommandBuilder.serializeSystemStreamPartitionSetToJSON(sspTaskNameMap.getJavaFriendlyType)
+
+    val backFromSSPTaskNameMap = TaskNamesToSystemStreamPartitions(ShellCommandBuilder.deserializeSystemStreamPartitionSetFromJSON(asString))
+    assertEquals(sspTaskNameMap, backFromSSPTaskNameMap)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
index 4f7ddcd..d425e86 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
@@ -21,7 +21,6 @@ package org.apache.samza.metrics
 
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.config.MapConfig
 import grizzled.slf4j.Logging
 import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
 import java.io.IOException

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
index 70d8c80..0d07314 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
@@ -19,40 +19,40 @@
 
 package org.apache.samza.serializers
 
+import java.util
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.container.TaskName
+import org.apache.samza.system.SystemStreamPartition
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.system.SystemStream
-import org.apache.samza.checkpoint.Checkpoint
 import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.SamzaException
-import org.apache.samza.Partition
 
 class TestCheckpointSerde {
   @Test
   def testExactlyOneOffset {
     val serde = new CheckpointSerde
-    var offsets = Map[SystemStream, String]()
-    val systemStream = new SystemStream("test-system", "test-stream")
-    offsets += systemStream -> "1"
+    var offsets = Map[SystemStreamPartition, String]()
+    val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(777))
+    offsets += systemStreamPartition -> "1"
     val deserializedOffsets = serde.fromBytes(serde.toBytes(new Checkpoint(offsets)))
-    assertEquals("1", deserializedOffsets.getOffsets.get(systemStream))
+    assertEquals("1", deserializedOffsets.getOffsets.get(systemStreamPartition))
     assertEquals(1, deserializedOffsets.getOffsets.size)
   }
 
   @Test
-  def testMoreThanOneOffsetShouldFail {
-    val serde = new CheckpointSerde
-    var offsets = Map[SystemStream, String]()
-    // Since SS != SSP with same system and stream name, this should result in 
-    // two offsets for one system stream in the serde.
-    offsets += new SystemStream("test-system", "test-stream") -> "1"
-    offsets += new SystemStreamPartition("test-system", "test-stream", new Partition(0)) -> "2"
-    try {
-      serde.toBytes(new Checkpoint(offsets))
-      fail("Expected to fail with more than one offset for a single SystemStream.")
-    } catch {
-      case e: SamzaException => // expected this
-    }
+  def testChangelogPartitionMappingRoundTrip {
+    val mapping = new util.HashMap[TaskName, java.lang.Integer]()
+    mapping.put(new TaskName("Ted"), 0)
+    mapping.put(new TaskName("Dougal"), 1)
+    mapping.put(new TaskName("Jack"), 2)
+
+    val checkpointSerde = new CheckpointSerde
+    val asBytes = checkpointSerde.changelogPartitionMappingToBytes(mapping)
+    val backToMap = checkpointSerde.changelogPartitionMappingFromBytes(asBytes)
+
+    assertEquals(mapping, backToMap)
+    assertNotSame(mapping, backToMap)
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
index d31c3ce..f505eb1 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
@@ -19,17 +19,17 @@
 
 package org.apache.samza.system.filereader
 
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.AfterClass
-import java.io.PrintWriter
 import java.io.File
+import java.io.FileWriter
+import java.io.PrintWriter
 import org.apache.samza.Partition
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.AfterClass
+import org.junit.Assert._
+import org.junit.BeforeClass
+import org.junit.Test
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
-import org.junit.BeforeClass
-import java.io.FileWriter
 
 object TestFileReaderSystemConsumer {
   val consumer = new FileReaderSystemConsumer("file-reader", null)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
index 12f1e03..7cfeb5a 100644
--- a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
@@ -21,13 +21,15 @@ package org.apache.samza.task
 
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.Partition
 import org.apache.samza.task.TaskCoordinator.RequestScope
+import org.apache.samza.container.TaskName
 
 class TestReadableCoordinator {
+  val taskName = new TaskName("P0")
+
   @Test
   def testCommitTask {
-    val coord = new ReadableCoordinator(new Partition(0))
+    val coord = new ReadableCoordinator(taskName)
     assertFalse(coord.requestedCommitTask)
     assertFalse(coord.requestedCommitAll)
     coord.commit(RequestScope.CURRENT_TASK)
@@ -37,7 +39,7 @@ class TestReadableCoordinator {
 
   @Test
   def testCommitAll {
-    val coord = new ReadableCoordinator(new Partition(0))
+    val coord = new ReadableCoordinator(taskName)
     assertFalse(coord.requestedCommitTask)
     assertFalse(coord.requestedCommitAll)
     coord.commit(RequestScope.ALL_TASKS_IN_CONTAINER)
@@ -47,7 +49,7 @@ class TestReadableCoordinator {
 
   @Test
   def testShutdownNow {
-    val coord = new ReadableCoordinator(new Partition(0))
+    val coord = new ReadableCoordinator(taskName)
     assertFalse(coord.requestedShutdownOnConsensus)
     assertFalse(coord.requestedShutdownNow)
     coord.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
@@ -57,7 +59,7 @@ class TestReadableCoordinator {
 
   @Test
   def testShutdownRequest {
-    val coord = new ReadableCoordinator(new Partition(0))
+    val coord = new ReadableCoordinator(taskName)
     assertFalse(coord.requestedShutdownOnConsensus)
     assertFalse(coord.requestedShutdownNow)
     coord.shutdown(RequestScope.CURRENT_TASK)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index ad6d2da..7c314ce 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -20,9 +20,13 @@ package org.apache.samza.util
 
 import org.apache.samza.Partition
 import org.apache.samza.config.Config
+import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
+import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.Util._
 import org.junit.Assert._
@@ -54,48 +58,32 @@ class TestUtil {
   }
 
   @Test
-  def testGetTopicPartitionsForTask() {
-    def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
-
-    val taskCount = 4
-    val streamsMap = Map("kafka.a" -> partitionSet(4), "kafka.b" -> partitionSet(18), "timestream.c" -> partitionSet(24))
-    val streamsAndParts = (for(s <- streamsMap.keys;
-                               part <- streamsMap.getOrElse(s, Set.empty))
-    yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
+  def testResolveTaskNameToChangelogPartitionMapping {
+    def testRunner(description:String, currentTaskNames:Set[TaskName], previousTaskNameMapping:Map[TaskName, Int],
+                   result:Map[TaskName, Int]) {
+      assertEquals("Failed: " + description, result,
+        Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, previousTaskNameMapping))
+    }
 
-    for(i <- 0 until taskCount) {
-      val result: Set[SystemStreamPartition] = Util.getStreamsAndPartitionsForContainer(i, taskCount, streamsAndParts)
-      // b -> 18 % 4 = 2 therefore first two results should have an extra element
-      if(i < 2) {
-        assertEquals(12, result.size)
-      } else {
-        assertEquals(11, result.size)
-      }
+    testRunner("No change between runs",
+      Set(new TaskName("Partition 0")),
+      Map(new TaskName("Partition 0") -> 0),
+      Map(new TaskName("Partition 0") -> 0))
 
-      result.foreach(r => assertEquals(i, r.getPartition.getPartitionId % taskCount))
-    }
-  }
-  
-  @Test
-  def testJsonCreateStreamPartitionStringRoundTrip() {
-    val getPartitions: Set[SystemStreamPartition] = {
-      // Build a heavily skewed set of partitions.
-      def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
-      val system = "all-same-system."
-      val lotsOfParts = Map(system + "topic-with-many-parts-a" -> partitionSet(128),
-        system + "topic-with-many-parts-b" -> partitionSet(128), system + "topic-with-many-parts-c" -> partitionSet(64))
-      val fewParts = ('c' to 'z').map(l => system + l.toString -> partitionSet(4)).toMap
-      val streamsMap = (lotsOfParts ++ fewParts)
-      (for(s <- streamsMap.keys;
-           part <- streamsMap.getOrElse(s, Set.empty)) yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
-    }
+    testRunner("New TaskName added, none missing this run",
+      Set(new TaskName("Partition 0"), new TaskName("Partition 1")),
+      Map(new TaskName("Partition 0") -> 0),
+      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1))
 
-    val streamsAndParts: Set[SystemStreamPartition] = getStreamsAndPartitionsForContainer(0, 4, getPartitions).toSet
-    println(streamsAndParts)
-    val asString = serializeSSPSetToJSON(streamsAndParts)
+    testRunner("New TaskName added, one missing this run",
+      Set(new TaskName("Partition 0"), new TaskName("Partition 2")),
+      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1),
+      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2))
 
-    val backToStreamsAndParts = deserializeSSPSetFromJSON(asString)
-    assertEquals(streamsAndParts, backToStreamsAndParts)
+    testRunner("New TaskName added, all previous missing this run",
+      Set(new TaskName("Partition 3"), new TaskName("Partition 4")),
+      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2),
+      Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2, new TaskName("Partition 3") -> 3, new TaskName("Partition 4") -> 4))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
new file mode 100644
index 0000000..5d8ee4f
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka
+
+import java.util
+import org.apache.samza.SamzaException
+import org.apache.samza.container.TaskName
+import org.codehaus.jackson.`type`.TypeReference
+import org.codehaus.jackson.map.ObjectMapper
+import scala.collection.JavaConversions._
+
+/**
+ * Kafka Checkpoint Log-specific key used to identify what type of entry is
+ * written for any particular log entry.
+ *
+ * @param map Backing map to hold key values
+ */
+class KafkaCheckpointLogKey private (val map: Map[String, String]) {
+  // This might be better as a case class...
+  import KafkaCheckpointLogKey._
+
+  /**
+   * Serialize this key to bytes
+   * @return Key as bytes
+   */
+  def toBytes(): Array[Byte] = {
+    val jMap = new util.HashMap[String, String](map.size)
+    jMap.putAll(map)
+
+    JSON_MAPPER.writeValueAsBytes(jMap)
+  }
+
+  private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY  + " in map for Kafka Checkpoint log key"))
+
+  /**
+   * Is this key for a checkpoint entry?
+   *
+   * @return true iff this key's entry is for a checkpoint
+   */
+  def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
+
+  /**
+   * Is this key for a changelog partition mapping?
+   *
+   * @return true iff this key's entry is for a changelog partition mapping
+   */
+  def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
+
+  /**
+   * If this Key is for a checkpoint entry, return its associated TaskName.
+   *
+   * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
+   */
+  def getCheckpointTaskName = {
+    val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
+    new TaskName(asString)
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: KafkaCheckpointLogKey =>
+      (that canEqual this) &&
+        map == that.map
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val state = Seq(map)
+    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
+}
+
+object KafkaCheckpointLogKey {
+  /**
+   *  Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
+   *  type, either a checkpoint or a changelog-partition-mapping.
+   */
+  val CHECKPOINT_KEY_KEY = "type"
+  val CHECKPOINT_KEY_TYPE = "checkpoint"
+  val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
+  val CHECKPOINT_TASKNAME_KEY = "taskName"
+  val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
+
+  /**
+   * Partition mapping keys have no dynamic values, so we just need one instance.
+   */
+  val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
+
+  private val JSON_MAPPER = new ObjectMapper()
+  val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
+
+  var systemStreamPartitionGrouperFactoryString:Option[String] = None
+
+  /**
+   * Set the name of the factory configured to provide the SystemStreamPartition grouping
+   * so it be included in the key.
+   *
+   * @param str Config value of SystemStreamPartition Grouper Factory
+   */
+  def setSystemStreamPartitionGrouperFactoryString(str:String) = {
+    systemStreamPartitionGrouperFactoryString = Some(str)
+  }
+
+  /**
+   * Get the name of the factory configured to provide the SystemStreamPartition grouping
+   * so it be included in the key
+   */
+  def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
+
+  /**
+   * Build a key for a a checkpoint log entry for a particular TaskName
+   * @param taskName TaskName to build for this checkpoint entry
+   *
+   * @return Key for checkpoint log entry
+   */
+  def getCheckpointKey(taskName:TaskName) = {
+    val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
+      CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
+      SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
+
+    new KafkaCheckpointLogKey(map)
+  }
+
+  /**
+   * Build a key for a changelog partition mapping entry
+   *
+   * @return Key for changelog partition mapping entry
+   */
+  def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
+
+  /**
+   * Deserialize a Kafka checkpoint log key
+   * @param bytes Serialized (via JSON) Kafka checkpoint log key
+   * @return Checkpoint log key
+   */
+  def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
+    try {
+      val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
+
+      if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
+        throw new SamzaException("No type entry in checkpoint key: " + jmap)
+      }
+
+      // Only checkpoint keys have ssp grouper factory keys
+      if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
+        val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
+
+        if (sspGrouperFactory == null) {
+          throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
+        }
+
+        if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
+          throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
+        }
+      }
+
+      new KafkaCheckpointLogKey(jmap.toMap)
+    } catch {
+      case e: Exception =>
+        throw new SamzaException("Exception while deserializing checkpoint key", e)
+    }
+  }
+}
+
+class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
+  override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
+    ") does not match value from current configuration (" + inConfig + ").  " +
+    "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 15245d4..fff62e4 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -19,76 +19,112 @@
 
 package org.apache.samza.checkpoint.kafka
 
-import org.I0Itec.zkclient.ZkClient
 import grizzled.slf4j.Logging
+import java.nio.ByteBuffer
+import java.util
 import kafka.admin.AdminUtils
-import kafka.api.FetchRequestBuilder
-import kafka.api.OffsetRequest
-import kafka.api.PartitionOffsetRequestInfo
+import kafka.api._
 import kafka.common.ErrorMapping
+import kafka.common.InvalidMessageSizeException
 import kafka.common.TopicAndPartition
 import kafka.common.TopicExistsException
-import kafka.common.InvalidMessageSizeException
 import kafka.common.UnknownTopicOrPartitionException
 import kafka.consumer.SimpleConsumer
+import kafka.message.InvalidMessageException
 import kafka.producer.KeyedMessage
-import kafka.producer.Partitioner
 import kafka.producer.Producer
-import kafka.serializer.Decoder
-import kafka.serializer.Encoder
 import kafka.utils.Utils
-import kafka.utils.VerifiableProperties
-import kafka.message.InvalidMessageException
-import org.apache.samza.Partition
+import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.container.TaskName
 import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.serializers.Serde
 import org.apache.samza.system.kafka.TopicMetadataCache
-import org.apache.samza.util.TopicMetadataStore
 import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.TopicMetadataStore
+import scala.collection.mutable
 
 /**
- * Kafka checkpoint manager is used to store checkpoints in a Kafka topic that
- * is uniquely identified by a job/partition combination. To read a checkpoint
- * for a given job and partition combination (e.g. my-job, partition 1), we
- * simply read the last message from the topic: __samza_checkpoint_my-job_1. If
- * the topic does not yet exist, we assume that there is not yet any state for
- * this job/partition pair, and return an empty checkpoint.
+ * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
+ * To read a checkpoint for a specific taskName, we find the newest message
+ * keyed to that taskName. If there is no such message, no checkpoint data
+ * exists.  The underlying log has a single partition into which all
+ * checkpoints and TaskName to changelog partition mappings are written.
  */
 class KafkaCheckpointManager(
   clientId: String,
   checkpointTopic: String,
   systemName: String,
-  totalPartitions: Int,
   replicationFactor: Int,
   socketTimeout: Int,
   bufferSize: Int,
   fetchSize: Int,
   metadataStore: TopicMetadataStore,
-  connectProducer: () => Producer[Partition, Array[Byte]],
+  connectProducer: () => Producer[Array[Byte], Array[Byte]],
   connectZk: () => ZkClient,
+  systemStreamPartitionGrouperFactoryString: String,
   retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
-  serde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager with Logging {
+  serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager with Logging {
+  import KafkaCheckpointManager._
+
+  var taskNames = Set[TaskName]()
+  var producer: Producer[Array[Byte], Array[Byte]] = null
+  var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
 
-  var partitions = Set[Partition]()
-  var producer: Producer[Partition, Array[Byte]] = null
+  var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
 
-  info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format (clientId, checkpointTopic, systemName))
+  KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
 
-  def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) {
+  info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
+
+  /**
+   * Write Checkpoint for specified taskName to log
+   *
+   * @param taskName Specific Samza taskName of which to write a checkpoint of.
+   * @param checkpoint Reference to a Checkpoint object to store offset data in.
+   **/
+  override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
+    val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
+    val keyBytes = key.toBytes()
+    val msgBytes = serde.toBytes(checkpoint)
+
+    writeLog(CHECKPOINT_LOG4J_ENTRY, keyBytes, msgBytes)
+  }
+
+  /**
+   * Write the taskName to partition mapping that is being maintained by this CheckpointManager
+   *
+   * @param changelogPartitionMapping Each TaskName's partition within the changelog
+   */
+  override def writeChangeLogPartitionMapping(changelogPartitionMapping: util.Map[TaskName, java.lang.Integer]) {
+    val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
+    val keyBytes = key.toBytes()
+    val msgBytes = serde.changelogPartitionMappingToBytes(changelogPartitionMapping)
+
+    writeLog(CHANGELOG_PARTITION_MAPPING_LOG4j, keyBytes, msgBytes)
+  }
+
+  /**
+   * Common code for writing either checkpoints or changelog-partition-mappings to the log
+   *
+   * @param logType Type of entry that is being written, for logging
+   * @param key pre-serialized key for message
+   * @param msg pre-serialized message to write to log
+   */
+  private def writeLog(logType:String, key: Array[Byte], msg: Array[Byte]) {
     retryBackoff.run(
       loop => {
         if (producer == null) {
           producer = connectProducer()
         }
-        producer.send(new KeyedMessage(checkpointTopic, null, partition, serde.toBytes(checkpoint)))
+
+        producer.send(new KeyedMessage(checkpointTopic, key, 0, msg))
         loop.done
       },
 
       (exception, loop) => {
-        warn("Failed to send checkpoint %s for partition %s: %s. Retrying." format (checkpoint, partition, exception))
+        warn("Failed to write %s partition entry %s: %s. Retrying." format(logType, key, exception))
         debug("Exception detail:", exception)
         if (producer != null) {
           producer.close
@@ -98,124 +134,219 @@ class KafkaCheckpointManager(
     )
   }
 
-  def readLastCheckpoint(partition: Partition): Checkpoint = {
-    info("Reading checkpoint for partition %s." format partition.getPartitionId)
+  private def getConsumer(): SimpleConsumer = {
+    val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+    val metadata = metadataMap(checkpointTopic)
+    val partitionMetadata = metadata.partitionsMetadata
+      .filter(_.partitionId == 0)
+      .headOption
+      .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
+    val leader = partitionMetadata
+      .leader
+      .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
+
+    info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
+
+    new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
+  }
+
+  private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
+
+  private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
+    val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
+      .partitionErrorAndOffsets
+      .get(topicAndPartition)
+      .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
+    // Fail or retry if there was an an issue with the offset request.
+    ErrorMapping.maybeThrowException(offsetResponse.error)
+
+    val offset: Long = offsetResponse
+      .offsets
+      .headOption
+      .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic))
+
+    offset
+  }
+
+  /**
+   * Read the last checkpoint for specified TaskName
+   *
+   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+   **/
+  override def readLastCheckpoint(taskName: TaskName): Checkpoint = {
+    if (!taskNames.contains(taskName)) {
+      throw new SamzaException(taskName + " not registered with this CheckpointManager")
+    }
+
+    info("Reading checkpoint for taskName " + taskName)
+
+    if (taskNamesToOffsets == null) {
+      info("No TaskName to checkpoint mapping provided.  Reading for first time.")
+      taskNamesToOffsets = readCheckpointsFromLog()
+    } else {
+      info("Already existing checkpoint mapping.  Merging new offsets")
+      taskNamesToOffsets ++= readCheckpointsFromLog()
+    }
+
+    val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
+
+    info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
+
+    checkpoint
+  }
+
+  /**
+   * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
+   */
+  private def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
+    val checkpoints = mutable.Map[TaskName, Checkpoint]()
+
+    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
+
+    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+      val taskName = checkpointKey.getCheckpointTaskName
+
+      if (taskNames.contains(taskName)) {
+        val checkpoint = serde.fromBytes(Utils.readBytes(payload))
+
+        debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
+
+        checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
+      }
+    }
+
+    readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
+
+    checkpoints.toMap /* of the immutable kind */
+  }
+
+  /**
+   * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
+   *
+   * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based
+   * checkpoint log reading
+   */
+  override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
+    var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
+
+    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
+
+    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+      changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
+
+      debug("Adding changelog partition mapping" + changelogPartitionMapping)
+    }
 
-    val checkpoint = retryBackoff.run(
+    readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
+
+    changelogPartitionMapping
+  }
+
+  /**
+   * Common code for reading both changelog partition mapping and change log
+   *
+   * @param entryType What type of entry to look for within the log key's
+   * @param handleEntry Code to handle an entry in the log once it's found
+   */
+  private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
+                      handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
+    retryBackoff.run[Unit](
       loop => {
-        // Assume checkpoint topic exists with correct partitions, since it should be verified on start.
-        // Fetch the metadata for this checkpoint topic/partition pair.
-        val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
-        val metadata = metadataMap(checkpointTopic)
-        val partitionMetadata = metadata.partitionsMetadata
-          .filter(_.partitionId == partition.getPartitionId)
-          .headOption
-          .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition %d, but it didn't exist in Kafka." format partition.getPartitionId))
-        val partitionId = partitionMetadata.partitionId
-        val leader = partitionMetadata
-          .leader
-          .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
-
-        info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format (leader.host, leader.port, checkpointTopic, partitionId))
-
-        val consumer = new SimpleConsumer(
-          leader.host,
-          leader.port,
-          socketTimeout,
-          bufferSize,
-          clientId)
+        val consumer = getConsumer()
+
+        val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
+
         try {
-          val topicAndPartition = new TopicAndPartition(checkpointTopic, partitionId)
-          val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
-            .partitionErrorAndOffsets
-            .get(topicAndPartition)
-            .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (checkpointTopic, partitionId)))
+          var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
 
-          // Fail or retry if there was an an issue with the offset request.
-          ErrorMapping.maybeThrowException(offsetResponse.error)
+          info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
 
-          val offset = offsetResponse
-            .offsets
-            .headOption
-            .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (checkpointTopic, partitionId)))
+          val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
 
-          info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, checkpointTopic, partitionId))
+          info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
 
-          if (offset <= 0) {
-            info("Got offset 0 (no messages in checkpoint topic) for topic %s and partition %s, so returning null. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (checkpointTopic, partition))
-            return null
+          if (offset < 0) {
+            info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
+            return
           }
 
-          val request = new FetchRequestBuilder()
-            // Kafka returns 1 greater than the offset of the last message in
-            // the topic, so subtract one to fetch the last message.
-            .addFetch(checkpointTopic, partitionId, offset - 1, fetchSize)
-            .maxWait(500)
-            .minBytes(1)
-            .clientId(clientId)
-            .build
-          val messageSet = consumer.fetch(request)
-          if (messageSet.hasError) {
-            warn("Got error code from broker for %s: %s" format (checkpointTopic, messageSet.errorCode(checkpointTopic, partitionId)))
-            val errorCode = messageSet.errorCode(checkpointTopic, partitionId)
-            if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
-              warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (checkpointTopic, partitionId))
-              return null
+          while (offset < latestOffset) {
+            val request = new FetchRequestBuilder()
+              .addFetch(checkpointTopic, 0, offset, fetchSize)
+              .maxWait(500)
+              .minBytes(1)
+              .clientId(clientId)
+              .build
+
+            val fetchResponse = consumer.fetch(request)
+            if (fetchResponse.hasError) {
+              warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
+              val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
+              if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+                warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
+                return
+              }
+              ErrorMapping.maybeThrowException(errorCode)
             }
-            ErrorMapping.maybeThrowException(errorCode)
-          }
-          val messages = messageSet.messageSet(checkpointTopic, partitionId).toList
 
-          if (messages.length != 1) {
-            throw new KafkaCheckpointException("Something really unexpected happened. Got %s "
-              + "messages back when fetching from checkpoint topic %s and partition %s. "
-              + "Expected one message. It would be unsafe to go on without the latest checkpoint, "
-              + "so failing." format (messages.length, checkpointTopic, partition))
-          }
+            for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
+              offset = response.nextOffset
+              startingOffset = Some(offset) // For next time we call
+
+              if (!response.message.hasKey) {
+                throw new KafkaCheckpointException("Encountered message without key.")
+              }
 
-          // Some back bending to go from message to checkpoint.
-          val checkpoint = serde.fromBytes(Utils.readBytes(messages(0).message.payload))
-          loop.done
-          checkpoint
+              val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
+
+              if (!shouldHandleEntry(checkpointKey)) {
+                debug("Skipping " + entryType + " entry with key " + checkpointKey)
+              } else {
+                handleEntry(response.message.payload, checkpointKey)
+              }
+            }
+          }
         } finally {
-          consumer.close
+          consumer.close()
         }
+
+        loop.done
+        Unit
       },
 
       (exception, loop) => {
         exception match {
-          case e: InvalidMessageException => throw new KafkaCheckpointException ("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: InvalidMessageSizeException => throw new KafkaCheckpointException ("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException ("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
           case e: KafkaCheckpointException => throw e
           case e: Exception =>
-            warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format (checkpointTopic, partition, e))
+            warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
             debug("Exception detail:", e)
         }
       }
-    ).getOrElse(throw new SamzaException("Failed to get checkpoint for partition %s" format partition.getPartitionId))
+    ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
 
-    info("Got checkpoint state for partition %s: %s" format (partition.getPartitionId, checkpoint))
-    checkpoint
   }
 
   def start {
-    createTopic
+    create
     validateTopic
   }
 
-  def register(partition: Partition) {
-    partitions += partition
+  def register(taskName: TaskName) {
+    debug("Adding taskName " + taskName + " to " + this)
+    taskNames += taskName
   }
 
   def stop = {
-    if(producer != null) {
+    if (producer != null) {
       producer.close
     }
   }
 
-  private def createTopic {
-    info("Attempting to create checkpoint topic %s with %s partitions." format (checkpointTopic, totalPartitions))
+  def create {
+    info("Attempting to create checkpoint topic %s." format checkpointTopic)
     retryBackoff.run(
       loop => {
         val zkClient = connectZk()
@@ -223,7 +354,7 @@ class KafkaCheckpointManager(
           AdminUtils.createTopic(
             zkClient,
             checkpointTopic,
-            totalPartitions,
+            1,
             replicationFactor)
         } finally {
           zkClient.close
@@ -239,7 +370,7 @@ class KafkaCheckpointManager(
             info("Checkpoint topic %s already exists." format checkpointTopic)
             loop.done
           case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format (checkpointTopic, e))
+            warn("Failed to create topic %s: %s. Retrying." format(checkpointTopic, e))
             debug("Exception detail:", e)
         }
       }
@@ -255,8 +386,8 @@ class KafkaCheckpointManager(
         ErrorMapping.maybeThrowException(topicMetadata.errorCode)
 
         val partitionCount = topicMetadata.partitionsMetadata.length
-        if (partitionCount != totalPartitions) {
-          throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count %s." format (checkpointTopic, topicMetadata.partitionsMetadata.length, totalPartitions))
+        if (partitionCount != 1) {
+          throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length))
         }
 
         info("Successfully validated checkpoint topic %s." format checkpointTopic)
@@ -267,14 +398,19 @@ class KafkaCheckpointManager(
         exception match {
           case e: KafkaCheckpointException => throw e
           case e: Exception =>
-            warn("While trying to validate topic %s: %s. Retrying." format (checkpointTopic, e))
+            warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e))
             debug("Exception detail:", e)
         }
       }
     )
   }
 
-  override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format (systemName, checkpointTopic)
+  override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
+}
+
+object KafkaCheckpointManager {
+  val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
+  val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index cb6dbdf..087c6ad 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -19,26 +19,28 @@
 
 package org.apache.samza.checkpoint.kafka
 
-import org.apache.samza.config.{ KafkaConfig, Config }
-import org.apache.samza.SamzaException
-import java.util.Properties
+import grizzled.slf4j.Logging
 import kafka.producer.Producer
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.KafkaConfig.Config2Kafka
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.Partition
-import grizzled.slf4j.Logging
+import org.apache.samza.config.KafkaConfig.Config2Kafka
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
-import org.apache.samza.util.Util
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.ZKStringSerializer
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.checkpoint.CheckpointManager
 
+object KafkaCheckpointManagerFactory {
+  /**
+   * Version number to track the format of the checkpoint log
+   */
+  val CHECKPOINT_LOG_VERSION_NUMBER = 1
+}
 class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
+  import KafkaCheckpointManagerFactory._
+
   def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
     val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
     val systemName = config
@@ -60,7 +62,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
     val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
 
     val connectProducer = () => {
-      new Producer[Partition, Array[Byte]](producerConfig)
+      new Producer[Array[Byte], Array[Byte]](producerConfig)
     }
     val zkConnect = Option(consumerConfig.zkConnect)
       .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
@@ -73,24 +75,24 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
       .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
     val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId, socketTimeout)
     val checkpointTopic = getTopic(jobName, jobId)
-    
-    // This is a reasonably expensive operation and the TaskInstance already knows the answer. Should use that info.
-    val totalPartitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet.size
+
+    // Find out the SSPGrouperFactory class so it can be included/verified in the key
+    val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory
 
     new KafkaCheckpointManager(
       clientId,
       checkpointTopic,
       systemName,
-      totalPartitions,
       replicationFactor,
       socketTimeout,
       bufferSize,
       fetchSize,
       metadataStore,
       connectProducer,
-      connectZk)
+      connectZk,
+      systemStreamPartitionGrouperFactoryString)
   }
 
   private def getTopic(jobName: String, jobId: String) =
-    "__samza_checkpoint_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+    "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
index 8a8834f..9553050 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -19,8 +19,6 @@
 
 package org.apache.samza.system.kafka
 
-import scala.annotation.implicitNotFound
-
 import grizzled.slf4j.Logging
 import kafka.api.TopicMetadata
 import kafka.common.ErrorMapping

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
new file mode 100644
index 0000000..7a23041
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka
+
+import org.apache.samza.container.TaskName
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.apache.samza.SamzaException
+
+class TestKafkaCheckpointLogKey {
+  @Before
+  def setSSPGrouperFactoryString() {
+    KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("hello")
+  }
+
+  @Test
+  def checkpointKeySerializationRoundTrip() {
+    val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN"))
+    val asBytes = checkpointKey.toBytes()
+    val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
+
+    assertEquals(checkpointKey, backFromBytes)
+    assertNotSame(checkpointKey, backFromBytes)
+  }
+
+  @Test
+  def changelogPartitionMappingKeySerializationRoundTrip() {
+    val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
+    val asBytes = key.toBytes()
+    val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
+
+    assertEquals(key, backFromBytes)
+    assertNotSame(key, backFromBytes)
+  }
+
+  @Test
+  def differingSSPGrouperFactoriesCauseException() {
+
+    val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN"))
+
+    val asBytes = checkpointKey.toBytes()
+
+    KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("goodbye")
+
+    var gotException = false
+    try {
+      KafkaCheckpointLogKey.fromBytes(asBytes)
+    } catch {
+      case se:SamzaException => assertEquals(new DifferingSystemStreamPartitionGrouperFactoryValues("hello", "goodbye").getMessage(), se.getCause.getMessage)
+                                gotException = true
+    }
+
+    assertTrue("Should have had an exception since ssp grouper factories didn't match", gotException)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 92ac61e..cddee13 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,11 +19,9 @@
 
 package org.apache.samza.checkpoint.kafka
 
-import org.I0Itec.zkclient.ZkClient
-import org.junit.Assert._
-import org.junit.AfterClass
-import org.junit.BeforeClass
-import org.junit.Test
+import kafka.common.InvalidMessageSizeException
+import kafka.common.UnknownTopicOrPartitionException
+import kafka.message.InvalidMessageException
 import kafka.producer.Producer
 import kafka.producer.ProducerConfig
 import kafka.server.KafkaConfig
@@ -31,20 +29,20 @@ import kafka.server.KafkaServer
 import kafka.utils.TestUtils
 import kafka.utils.TestZKUtils
 import kafka.utils.Utils
+import kafka.utils.ZKStringSerializer
 import kafka.zk.EmbeddedZookeeper
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.Partition
-import scala.collection._
-import scala.collection.JavaConversions._
-import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
-import org.apache.samza.config.MapConfig
+import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.container.TaskName
 import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.SystemStream
-import kafka.utils.ZKStringSerializer
-import kafka.message.InvalidMessageException
-import kafka.common.InvalidMessageSizeException
-import kafka.common.UnknownTopicOrPartitionException
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
+import org.apache.samza.{SamzaException, Partition}
+import org.junit.Assert._
+import org.junit.{AfterClass, BeforeClass, Test}
+import scala.collection.JavaConversions._
+import scala.collection._
+import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
 
 object TestKafkaCheckpointManager {
   val zkConnect: String = TestZKUtils.zookeeperConnect
@@ -72,14 +70,16 @@ object TestKafkaCheckpointManager {
   config.put("request.required.acks", "-1")
   val producerConfig = new ProducerConfig(config)
   val partition = new Partition(0)
-  val cp1 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "12345"))
+  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
+  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
   var zookeeper: EmbeddedZookeeper = null
   var server1: KafkaServer = null
   var server2: KafkaServer = null
   var server3: KafkaServer = null
   var metadataStore: TopicMetadataStore = null
 
+  val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
+
   @BeforeClass
   def beforeSetupServers {
     zookeeper = new EmbeddedZookeeper(zkConnect)
@@ -108,42 +108,45 @@ class TestKafkaCheckpointManager {
   import TestKafkaCheckpointManager._
 
   @Test
-  def testCheckpointShouldBeNullIfcheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
+  def testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
     val kcm = getKafkaCheckpointManager
-    kcm.register(partition)
+    val taskName = new TaskName(partition.toString)
+    kcm.register(taskName)
     kcm.start
-    var readCp = kcm.readLastCheckpoint(partition)
+    var readCp = kcm.readLastCheckpoint(taskName)
     // read before topic exists should result in a null checkpoint
-    assert(readCp == null)
+    assertNull(readCp)
     // create topic the first time around
-    kcm.writeCheckpoint(partition, cp1)
-    readCp = kcm.readLastCheckpoint(partition)
-    assert(cp1.equals(readCp))
+    kcm.writeCheckpoint(taskName, cp1)
+    readCp = kcm.readLastCheckpoint(taskName)
+    assertEquals(cp1, readCp)
     // should get an exception if partition doesn't exist
     try {
-      readCp = kcm.readLastCheckpoint(new Partition(1))
+      readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString))
       fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
     } catch {
-      case e: Exception => None // expected
+      case e: SamzaException => None // expected
+      case _: Exception => fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
     }
     // writing a second message should work, too
-    kcm.writeCheckpoint(partition, cp2)
-    readCp = kcm.readLastCheckpoint(partition)
-    assert(cp2.equals(readCp))
+    kcm.writeCheckpoint(taskName, cp2)
+    readCp = kcm.readLastCheckpoint(taskName)
+    assertEquals(cp2, readCp)
     kcm.stop
   }
 
   @Test
-  def testUnrecovableKafkaErrorShouldThrowKafkaCheckpointManagerException {
+  def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
     val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException")
     exceptions.foreach { exceptionName =>
       val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
-      kcm.register(partition)
+      val taskName = new TaskName(partition.toString)
+      kcm.register(taskName)
       kcm.start
-      kcm.writeCheckpoint(partition, cp1)
+      kcm.writeCheckpoint(taskName, cp1)
       // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
       try {
-        val readCpInvalide = kcm.readLastCheckpoint(partition)
+        kcm.readLastCheckpoint(taskName)
         fail("Expected a KafkaCheckpointException.")
       } catch {
         case e: KafkaCheckpointException => None
@@ -156,28 +159,28 @@ class TestKafkaCheckpointManager {
     clientId = "some-client-id",
     checkpointTopic = "checkpoint-topic",
     systemName = "kafka",
-    totalPartitions = 1,
     replicationFactor = 3,
     socketTimeout = 30000,
     bufferSize = 64 * 1024,
     fetchSize = 300 * 1024,
     metadataStore = metadataStore,
-    connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig),
-    connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
+    connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
+    connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+    systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString)
 
   // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
   private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
     clientId = "some-client-id-invalid-serde",
     checkpointTopic = "checkpoint-topic-invalid-serde",
     systemName = "kafka",
-    totalPartitions = 1,
     replicationFactor = 3,
     socketTimeout = 30000,
     bufferSize = 64 * 1024,
     fetchSize = 300 * 1024,
     metadataStore = metadataStore,
-    connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig),
+    connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
     connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+    systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     serde = new InvalideSerde(exception))
 
   class InvalideSerde(exception: String) extends CheckpointSerde {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 6be9732..be1670c 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -22,10 +22,8 @@
 package org.apache.samza.system.kafka
 
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Test, BeforeClass, AfterClass}
 import kafka.zk.EmbeddedZookeeper
-import org.junit.BeforeClass
-import org.junit.AfterClass
 import org.apache.samza.util.ClientUtilTopicMetadataStore
 import org.I0Itec.zkclient.ZkClient
 import kafka.admin.AdminUtils

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
index 751fe4c..6652f6b 100644
--- a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -19,13 +19,9 @@
 
 package org.apache.samza.storage.kv
 
-import java.nio.ByteBuffer
 import org.iq80.leveldb._
-import org.fusesource.leveldbjni.internal.NativeComparator
 import org.fusesource.leveldbjni.JniDBFactory._
 import java.io._
-import java.util.Iterator
-import java.lang.Iterable
 import org.apache.samza.config.Config
 import org.apache.samza.container.SamzaContainerContext
 import grizzled.slf4j.{ Logger, Logging }
@@ -39,8 +35,8 @@ object LevelDbKeyValueStore {
     val options = new Options
 
     // Cache size and write buffer size are specified on a per-container basis.
-    options.cacheSize(cacheSize / containerContext.partitions.size)
-    options.writeBufferSize((writeBufSize / containerContext.partitions.size).toInt)
+    options.cacheSize(cacheSize / containerContext.taskNames.size)
+    options.writeBufferSize((writeBufSize / containerContext.taskNames.size).toInt)
     options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096))
     options.compressionType(
       storeConfig.get("leveldb.compression", "snappy") match {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
index 222c130..f20bb7f 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
@@ -20,6 +20,7 @@
 package org.apache.samza.test.integration.join;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -45,12 +46,12 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
   
   private KeyValueStore<String, String> state;
   private int max;
-  private String partition;
+  private TaskName taskName;
 
   @Override
   public void init(Config config, TaskContext context) {
     this.state = (KeyValueStore<String, String>) context.getStore("emitter-state");
-    this.partition = Integer.toString(context.getPartition().getPartitionId());
+    this.taskName = context.getTaskName();
     this.max = config.getInt("count");
   }
 
@@ -79,7 +80,7 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
     }
     int counter = getInt(COUNT);
     if(counter < max) {
-      OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + partition);
+      OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + taskName);
       collector.send(envelope);
       this.state.put(COUNT, Integer.toString(getInt(COUNT) + 1));
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index c0ac5dd..7d0b8db 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -22,7 +22,7 @@ package org.apache.samza.test.performance
 import grizzled.slf4j.Logging
 import org.apache.samza.config.Config
 import org.apache.samza.config.StorageConfig._
-import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.container.{TaskName, SamzaContainerContext}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.kv.KeyValueStorageEngine
@@ -34,7 +34,6 @@ import org.apache.samza.serializers.ByteSerde
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 import java.io.File
-import scala.collection.JavaConversions._
 import java.util.UUID
 
 /**
@@ -79,7 +78,9 @@ object TestKeyValuePerformance extends Logging {
     val numLoops = config.getInt("test.num.loops", 100)
     val messagesPerBatch = config.getInt("test.messages.per.batch", 10000)
     val messageSizeBytes = config.getInt("test.message.size.bytes", 200)
-    val partitions = (0 until partitionCount).map(new Partition(_))
+    val taskNames = new java.util.ArrayList[TaskName]()
+
+    (0 until partitionCount).map(p => taskNames.add(new TaskName(new Partition(p).toString)))
 
     info("Using partition count: %s" format partitionCount)
     info("Using num loops: %s" format numLoops)
@@ -109,7 +110,7 @@ object TestKeyValuePerformance extends Logging {
         new ReadableCollector,
         new MetricsRegistryMap,
         null,
-        new SamzaContainerContext("test", config, partitions))
+        new SamzaContainerContext("test", config, taskNames))
 
       val db = if (!engine.isInstanceOf[KeyValueStorageEngine[_, _]]) {
         throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index dc44a99..3ed8b5c 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -19,64 +19,55 @@
 
 package org.apache.samza.test.integration
 
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskContext
-import org.apache.samza.task.InitableTask
-import org.apache.samza.config.Config
-import scala.collection.JavaConversions._
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.checkpoint.Checkpoint
-import org.junit.BeforeClass
-import org.junit.AfterClass
-import kafka.zk.EmbeddedZookeeper
-import kafka.utils.TestUtils
-import org.apache.samza.system.SystemStream
-import kafka.utils.TestZKUtils
-import kafka.server.KafkaConfig
-import org.I0Itec.zkclient.ZkClient
+import java.util.Properties
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+import kafka.admin.AdminUtils
+import kafka.common.ErrorMapping
+import kafka.consumer.Consumer
+import kafka.consumer.ConsumerConfig
+import kafka.message.MessageAndMetadata
+import kafka.producer.KeyedMessage
+import kafka.producer.Producer
 import kafka.producer.ProducerConfig
+import kafka.server.KafkaConfig
 import kafka.server.KafkaServer
+import kafka.utils.TestUtils
+import kafka.utils.TestZKUtils
 import kafka.utils.Utils
-import org.apache.samza.storage.kv.KeyValueStore
-import org.apache.samza.util._
-import org.junit.Test
-import kafka.admin.AdminUtils
-import kafka.common.ErrorMapping
-import org.junit.Assert._
 import kafka.utils.ZKStringSerializer
-import scala.collection.mutable.ArrayBuffer
-import org.apache.samza.job.local.LocalJobFactory
-import org.apache.samza.job.ApplicationStatus
-import java.util.concurrent.CountDownLatch
-import org.apache.samza.job.local.ThreadJob
-import org.apache.samza.util.TopicMetadataStore
-import org.apache.samza.util.ClientUtilTopicMetadataStore
+import kafka.zk.EmbeddedZookeeper
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
+import org.apache.samza.container.TaskName
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.StreamJob
+import org.apache.samza.job.local.LocalJobFactory
+import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.system.kafka.TopicMetadataCache
-import org.apache.samza.container.SamzaContainer
+import org.apache.samza.system.{SystemStreamPartition, IncomingMessageEnvelope}
+import org.apache.samza.task.InitableTask
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskContext
+import org.apache.samza.task.TaskCoordinator
+import org.apache.samza.task.TaskCoordinator.RequestScope
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.apache.samza.util.TopicMetadataStore
+import org.junit.Assert._
+import org.junit.{BeforeClass, AfterClass, Test}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.SynchronizedMap
-import org.apache.samza.Partition
-import java.util.concurrent.TimeUnit
-import kafka.producer.Producer
-import kafka.producer.KeyedMessage
-import java.util.concurrent.Semaphore
-import java.util.concurrent.CyclicBarrier
-import kafka.consumer.Consumer
-import kafka.consumer.ConsumerConfig
-import java.util.Properties
-import java.util.concurrent.Executors
-import kafka.message.MessageAndOffset
-import kafka.message.MessageAndMetadata
-import org.apache.samza.job.StreamJob
-import org.apache.samza.task.TaskCoordinator.RequestScope
 
 object TestStatefulTask {
   val INPUT_TOPIC = "input"
   val STATE_TOPIC = "mystore"
-  val TOTAL_PARTITIONS = 1
+  val TOTAL_TASK_NAMES = 1
   val REPLICATION_FACTOR = 3
 
   val zkConnect: String = TestZKUtils.zookeeperConnect
@@ -102,8 +93,8 @@ object TestStatefulTask {
   config.put("serializer.class", "kafka.serializer.StringEncoder");
   val producerConfig = new ProducerConfig(config)
   var producer: Producer[String, String] = null
-  val cp1 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "12345"))
+  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123"))
+  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345"))
   var zookeeper: EmbeddedZookeeper = null
   var server1: KafkaServer = null
   var server2: KafkaServer = null
@@ -128,13 +119,13 @@ object TestStatefulTask {
     AdminUtils.createTopic(
       zkClient,
       INPUT_TOPIC,
-      TOTAL_PARTITIONS,
+      TOTAL_TASK_NAMES,
       REPLICATION_FACTOR)
 
     AdminUtils.createTopic(
       zkClient,
       STATE_TOPIC,
-      TOTAL_PARTITIONS,
+      TOTAL_TASK_NAMES,
       REPLICATION_FACTOR)
   }
 
@@ -221,7 +212,14 @@ class TestStatefulTask {
     "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
     "systems.kafka.samza.msg.serde" -> "string",
     "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
-    "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1))
+    "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+    // Since using state, need a checkpoint manager
+    "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+    "task.checkpoint.system" -> "kafka",
+    "task.checkpoint.replication.factor" -> "1",
+    // However, don't have the inputs use the checkpoint manager
+    // since the second part of the test expects to replay the input streams.
+    "systems.kafka.streams.input.samza.reset.offset" -> "true")
 
   @Test
   def testShouldStartAndRestore {
@@ -278,7 +276,7 @@ class TestStatefulTask {
       count += 1
     }
 
-    assertTrue(count < 100)
+    assertTrue("Timed out waiting to received messages. Received thus far: " + task.received.size, count < 100)
 
     // Reset the count down latch after the 4 messages come in.
     task.awaitMessage
@@ -328,8 +326,7 @@ class TestStatefulTask {
     TestTask.awaitTaskRegistered
     val tasks = TestTask.tasks
 
-    // Should only have one partition.
-    assertEquals(1, tasks.size)
+    assertEquals("Should only have a single partition in this task", 1, tasks.size)
 
     val task = tasks.values.toList.head
 
@@ -392,25 +389,25 @@ class TestStatefulTask {
 }
 
 object TestTask {
-  val tasks = new HashMap[Partition, TestTask] with SynchronizedMap[Partition, TestTask]
-  @volatile var allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_PARTITIONS)
+  val tasks = new HashMap[TaskName, TestTask] with SynchronizedMap[TaskName, TestTask]
+  @volatile var allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES)
 
   /**
    * Static method that tasks can use to register themselves with. Useful so
    * we don't have to sneak into the ThreadJob/SamzaContainer to get our test
    * tasks.
    */
-  def register(partition: Partition, task: TestTask) {
-    tasks += partition -> task
+  def register(taskName: TaskName, task: TestTask) {
+    tasks += taskName -> task
     allTasksRegistered.countDown
   }
 
   def awaitTaskRegistered {
     allTasksRegistered.await(60, TimeUnit.SECONDS)
     assertEquals(0, allTasksRegistered.getCount)
-    assertEquals(TestStatefulTask.TOTAL_PARTITIONS, tasks.size)
+    assertEquals(TestStatefulTask.TOTAL_TASK_NAMES, tasks.size)
     // Reset the registered latch, so we can use it again every time we start a new job.
-    TestTask.allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_PARTITIONS)
+    TestTask.allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES)
   }
 }
 
@@ -422,7 +419,7 @@ class TestTask extends StreamTask with InitableTask {
   var gotMessage = new CountDownLatch(1)
 
   def init(config: Config, context: TaskContext) {
-    TestTask.register(context.getPartition, this)
+    TestTask.register(context.getTaskName, this)
     store = context
       .getStore(TestStatefulTask.STATE_TOPIC)
       .asInstanceOf[KeyValueStore[String, String]]

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index ea6f03b..86fc0fd 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -119,21 +119,24 @@
             %td.key Finished
             %td= state.finishedTasks.size.toString
 
-      %h3 Partition Assignment
-      %table.table.table-striped.table-bordered.tablesorter#partitions-table
+      %h3 TaskName Assignment
+      %table.table.table-striped.table-bordered.tablesorter#taskids-table
         %thead
           %tr
             %th Task ID
-            %th Partitions
+            %th TaskName
+            %th SystemStreamPartitions
             %th Container
         %tbody
-          - for((taskId, partitions) <- state.taskPartitions)
-            %tr
-              %td= taskId
-              %td= partitions.map(_.getPartitionId).toList.sorted.mkString(", ")
-              %td
-                - val container = state.runningTasks(taskId)
-                %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString
+          - for((taskId, taskNames) <- state.taskToTaskNames)
+            - for((taskName, ssps) <- taskNames)
+              %tr
+                %td= taskId
+                %td= taskName
+                %td= ssps.map(_.toString).toList.sorted.mkString(", ")
+                %td
+                  - val container = state.runningTasks(taskId)
+                  %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString
 
     %div.tab-pane#config
       %h2 Config
@@ -154,7 +157,7 @@
     :javascript
       $(document).ready(function() {
         $("#containers-table").tablesorter();
-        $("#partitions-table").tablesorter();
+        $("#taskids-table").tablesorter();
         $("#config-table").tablesorter();
         $("#config-table-filter").keyup(function() {
           var regex = new RegExp($(this).val(), 'i');

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index 01a2683..d9dfbc6 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -18,12 +18,12 @@
  */
 
 package org.apache.samza.job.yarn
-import org.apache.samza.config.Config
 import grizzled.slf4j.Logging
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.apache.samza.Partition
-import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.api.records.ContainerId
+import java.util
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.container.TaskName
 
 /**
  * Samza's application master has state that is usually manipulated based on
@@ -40,7 +40,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod
   var unclaimedTasks = Set[Int]()
   var finishedTasks = Set[Int]()
   var runningTasks = Map[Int, YarnContainer]()
-  var taskPartitions = Map[Int, Set[Partition]]()
+  var taskToTaskNames = Map[Int, util.Map[TaskName, util.Set[SystemStreamPartition]]]()
   var status = FinalApplicationStatus.UNDEFINED
 
   // controlled by the service

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index eb1ff54..0dd244d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
 import java.util.Collections
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.DataOutputBuffer
@@ -46,6 +47,7 @@ import org.apache.samza.job.ShellCommandBuilder
 import org.apache.samza.util.Util
 
 import grizzled.slf4j.Logging
+import org.apache.samza.container.TaskNamesToSystemStreamPartitions
 
 object SamzaAppMasterTaskManager {
   val DEFAULT_CONTAINER_MEM = 1024
@@ -72,7 +74,10 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
       1
   }
 
-  val allSystemStreamPartitions = Util.getInputStreamPartitions(config)
+  val tasksToSSPTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, state.taskCount)
+
+  val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, tasksToSSPTaskNames)
+
   var taskFailures = Map[Int, TaskFailure]()
   var tooManyFailedContainers = false
   // TODO we might want to use NMClientAsync as well
@@ -106,13 +111,14 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
     state.unclaimedTasks.headOption match {
       case Some(taskId) => {
         info("Got available task id (%d) for container: %s" format (taskId, container))
-        val streamsAndPartitionsForTask = Util.getStreamsAndPartitionsForContainer(taskId, state.taskCount, allSystemStreamPartitions)
-        info("Claimed partitions %s for container ID %s" format (streamsAndPartitionsForTask, taskId))
+        val sspTaskNames: TaskNamesToSystemStreamPartitions = tasksToSSPTaskNames.getOrElse(taskId, TaskNamesToSystemStreamPartitions())
+        info("Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, taskId))
         val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName)
         val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
           .setConfig(config)
           .setName("samza-container-%s" format taskId)
-          .setStreamPartitions(streamsAndPartitionsForTask)
+          .setTaskNameToSystemStreamPartitionsMapping(sspTaskNames.getJavaFriendlyType)
+          .setTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMapping.map(kv => kv._1 -> Integer.valueOf(kv._2)).asJava)
         val command = cmdBuilder.buildCommand
         info("Task ID %s using command %s" format (taskId, command))
         val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) }
@@ -129,7 +135,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
         state.neededContainers -= 1
         state.runningTasks += taskId -> new YarnContainer(container)
         state.unclaimedTasks -= taskId
-        state.taskPartitions += taskId -> streamsAndPartitionsForTask.map(_.getPartition).toSet
+        state.taskToTaskNames += taskId -> sspTaskNames.getJavaFriendlyType
 
         info("Claimed task ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
 
@@ -151,7 +157,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
     taskId match {
       case Some(taskId) => {
         state.runningTasks -= taskId
-        state.taskPartitions -= taskId
+        state.taskToTaskNames -= taskId
       }
       case _ => None
     }
@@ -315,4 +321,5 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
     capability.setVirtualCores(cpuCores)
     (0 until containers).foreach(idx => amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority)))
   }
+
 }