You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/29 04:57:50 UTC

[3/4] samza git commit: SAMZA-465: Use coordinator stream and eliminate CheckpointManager

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 3b6685e..e4b14f4 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -20,6 +20,9 @@
 package org.apache.samza.config
 
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.Logging
 
 object JobConfig {
   // job config constants
@@ -35,15 +38,45 @@ object JobConfig {
   val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config
   val JOB_NAME = "job.name" // streaming.job_name
   val JOB_ID = "job.id" // streaming.job_id
-
+  val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
+  val JOB_CONTAINER_COUNT = "job.container.count"
+  val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
+  val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
 
   implicit def Config2Job(config: Config) = new JobConfig(config)
 }
 
-class JobConfig(config: Config) extends ScalaMapConfig(config) {
+class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getName = getOption(JobConfig.JOB_NAME)
 
+  def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse({
+    // If no coordinator system is configured, try and guess it if there's just one system configured.
+    val systemNames = config.getSystemNames.toSet
+    if (systemNames.size == 1) {
+      val systemName = systemNames.iterator.next
+      info("No coordinator system defined, so defaulting to %s" format systemName)
+      systemName
+    } else {
+      throw new ConfigException("Missing job.coordinator.system configuration.")
+    }
+  })
+
+  def getContainerCount = {
+    getOption(JobConfig.JOB_CONTAINER_COUNT) match {
+      case Some(count) => count.toInt
+      case _ =>
+        // To maintain backwards compatibility, honor yarn.container.count for now.
+        // TODO get rid of this in a future release.
+        getOption("yarn.container.count") match {
+          case Some(count) =>
+            warn("Configuration 'yarn.container.count' is deprecated. Please use %s." format JobConfig.JOB_CONTAINER_COUNT)
+            count.toInt
+          case _ => 1
+        }
+    }
+  }
+
   def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
 
   def getJobId = getOption(JobConfig.JOB_ID)
@@ -53,4 +86,30 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) {
   def getConfigRewriterClass(name: String) = getOption(JobConfig.CONFIG_REWRITER_CLASS format name)
 
   def getSystemStreamPartitionGrouperFactory = getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName)
+
+  val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
+
+  def getCoordinatorReplicationFactor = getOption(JobConfig.JOB_REPLICATION_FACTOR) match {
+    case Some(rplFactor) => rplFactor
+    case _ =>
+      // TODO get rid of checkpoint configs in a future release
+      getOption("task.checkpoint.replication.factor") match {
+        case Some(rplFactor) =>
+          warn("Configuration 'task.checkpoint.replication.factor' is deprecated. Please use %s." format JobConfig.JOB_REPLICATION_FACTOR)
+          rplFactor
+        case _ => "3"
+      }
+  }
+
+  def getCoordinatorSegmentBytes = getOption(JobConfig.JOB_SEGMENT_BYTES) match {
+    case Some(segBytes) => segBytes
+    case _ =>
+      // TODO get rid of checkpoint configs in a future release
+      getOption("task.checkpoint.segment.bytes") match {
+        case Some(segBytes) =>
+          warn("Configuration 'task.checkpoint.segment.bytes' is deprecated. Please use %s." format JobConfig.JOB_SEGMENT_BYTES)
+          segBytes
+        case _ => "26214400"
+      }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 1a2dd44..e94a473 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -21,9 +21,9 @@ package org.apache.samza.config
 
 object ShellCommandConfig {
   /**
-   * This environment variable is used to store a JSON serialized map of all configuration.
+   * This environment variable is used to store a JSON serialized map of all coordinator system configs.
    */
-  val ENV_CONFIG = "SAMZA_CONFIG"
+  val ENV_COORDINATOR_SYSTEM_CONFIG = "SAMZA_COORDINATOR_SYSTEM_CONFIG"
 
   /**
    * The ID for a container. This is an integer number between 0 and

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index adef09e..e172589 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -21,8 +21,6 @@ package org.apache.samza.config
 
 import org.apache.samza.util.Logging
 import scala.collection.JavaConversions._
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Util
 import org.apache.samza.system.SystemStream
 
 object StreamConfig {

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index cd06c06..0b3a235 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -19,8 +19,8 @@
 
 package org.apache.samza.config
 
-import org.apache.samza.util.Util
 import org.apache.samza.system.SystemStream
+import org.apache.samza.util.Util
 
 object TaskConfig {
   // task config constants

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 720fbdc..56819e0 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -21,7 +21,7 @@ package org.apache.samza.container
 
 import java.io.File
 import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{ CheckpointManagerFactory, OffsetManager }
+import org.apache.samza.checkpoint.{ CheckpointManager, OffsetManager }
 import org.apache.samza.config.Config
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
@@ -30,6 +30,7 @@ import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -328,17 +329,16 @@ object SamzaContainer extends Logging {
 
     info("Got metrics reporters: %s" format reporters.keys)
 
-    val checkpointManager = config.getCheckpointManagerFactory match {
-      case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) =>
-        Util
-          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
-          .getCheckpointManager(config, samzaContainerMetrics.registry)
-      case _ => null
-    }
+    val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
+    val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
+    val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId))
 
     info("Got checkpoint manager: %s" format checkpointManager)
 
-    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics)
+    val combinedOffsets: Map[SystemStreamPartition, String] =
+      containerModel.getTasks.values().flatMap(_.getCheckpointedOffsets).toMap
+
+    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets)
 
     info("Got offset manager: %s" format offsetManager)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index c14f2f6..5b43b58 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -19,61 +19,74 @@
 
 package org.apache.samza.coordinator
 
+
 import org.apache.samza.config.Config
-import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
 import org.apache.samza.SamzaException
 import org.apache.samza.container.grouper.task.GroupByContainerCount
-import org.apache.samza.util.Util
-import org.apache.samza.checkpoint.CheckpointManagerFactory
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import java.util
 import org.apache.samza.container.TaskName
+import org.apache.samza.storage.ChangelogPartitionManager
 import org.apache.samza.util.Logging
-import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.config.StorageConfig.Config2Storage
+import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.Partition
-import org.apache.samza.job.model.TaskModel
 import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.serializers.model.SamzaObjectMapper
-import java.net.URL
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamMessage, CoordinatorStreamSystemFactory}
+import org.apache.samza.config.ConfigRewriter
 
+/**
+ * Helper companion object that is responsible for wiring up a JobCoordinator
+ * given a Config object.
+ */
 object JobCoordinator extends Logging {
+
   /**
-   * Build a JobCoordinator using a Samza job's configuration.
+   * @param coordinatorSystemConfig A config object that contains job.name,
+   * job.id, and all system.<job-coordinator-system-name>.*
+   * configuration. The method will use this config to read all configuration
+   * from the coordinator stream, and instantiate a JobCoordinator.
    */
-  def apply(config: Config, containerCount: Int) = {
-    val jobModel = buildJobModel(config, containerCount)
-    val server = new HttpServer
-    server.addServlet("/*", new JobServlet(jobModel))
-    new JobCoordinator(jobModel, server)
+  def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobCoordinator = {
+    val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
+    val coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
+    val coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
+    info("Registering coordinator system stream.")
+    coordinatorSystemConsumer.register
+    debug("Starting coordinator system stream.")
+    coordinatorSystemConsumer.start
+    debug("Bootstrapping coordinator system stream.")
+    coordinatorSystemConsumer.bootstrap
+    debug("Stopping coordinator system stream.")
+    coordinatorSystemConsumer.stop
+    val config = coordinatorSystemConsumer.getConfig
+    info("Got config: %s" format config)
+    val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
+    val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
+    getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager)
   }
 
+  def apply(coordinatorSystemConfig: Config): JobCoordinator = apply(coordinatorSystemConfig, new MetricsRegistryMap())
+
   /**
-   * Gets a CheckpointManager from the configuration.
+   * Build a JobCoordinator using a Samza job's configuration.
    */
-  def getCheckpointManager(config: Config) = {
-    config.getCheckpointManagerFactory match {
-      case Some(checkpointFactoryClassName) =>
-        Util
-          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
-          .getCheckpointManager(config, new MetricsRegistryMap)
-      case _ =>
-        if (!config.getStoreNames.isEmpty) {
-          throw new SamzaException("Storage factories configured, but no checkpoint manager has been specified.  " +
-            "Unable to start job as there would be no place to store changelog partition mapping.")
-        }
-        null
-    }
+  def getJobCoordinator(config: Config, checkpointManager: CheckpointManager, changelogManager: ChangelogPartitionManager) = {
+    val containerCount = config.getContainerCount
+    val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager)
+    val server = new HttpServer
+    server.addServlet("/*", new JobServlet(jobModelGenerator))
+    new JobCoordinator(jobModelGenerator(), server, checkpointManager)
   }
 
   /**
@@ -115,74 +128,135 @@ object JobCoordinator extends Logging {
   }
 
   /**
-   * Build a full Samza job model using the job configuration.
+   * Re-writes configuration using a ConfigRewriter, if one is defined. If
+   * there is no ConfigRewriter defined for the job, then this method is a
+   * no-op.
+   *
+   * @param config The config to re-write.
    */
-  def buildJobModel(config: Config, containerCount: Int) = {
-    // TODO containerCount should go away when we generalize the job coordinator, 
+  def rewriteConfig(config: Config): Config = {
+    def rewrite(c: Config, rewriterName: String): Config = {
+      val klass = config
+        .getConfigRewriterClass(rewriterName)
+        .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+      val rewriter = Util.getObj[ConfigRewriter](klass)
+      info("Re-writing config with " + rewriter)
+      rewriter.rewrite(rewriterName, c)
+    }
+
+    config.getConfigRewriters match {
+      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+      case _ => config
+    }
+  }
+
+  /**
+   * The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels
+   * which catchup with the latest content from the coordinator stream.
+   */
+  private def initializeJobModel(config: Config,
+                                 containerCount: Int,
+                                 checkpointManager: CheckpointManager,
+                                 changelogManager: ChangelogPartitionManager): () => JobModel = {
+    // TODO containerCount should go away when we generalize the job coordinator,
     // and have a non-yarn-specific way of specifying container count.
-    val checkpointManager = getCheckpointManager(config)
+
+    // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getInputStreamPartitions(config)
     val grouper = getSystemStreamPartitionGrouper(config)
-    val previousChangelogeMapping = if (checkpointManager != null) {
-      checkpointManager.start
-      checkpointManager.readChangeLogPartitionMapping
-    } else {
-      new util.HashMap[TaskName, java.lang.Integer]()
+    info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
+    val groups = grouper.group(allSystemStreamPartitions)
+
+    // Initialize the ChangelogPartitionManager and the CheckpointManager
+    val previousChangelogeMapping = if (changelogManager != null)
+    {
+      changelogManager.start
+      changelogManager.readChangeLogPartitionMapping
     }
-    var maxChangelogPartitionId = previousChangelogeMapping
-      .values
-      .map(_.toInt)
-      .toList
-      .sorted
-      .lastOption
-      .getOrElse(-1)
-
-    // Assign all SystemStreamPartitions to TaskNames.
-    val taskModels = {
-      val groups = grouper.group(allSystemStreamPartitions)
-      info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
-      groups
-        .map {
-          case (taskName, systemStreamPartitions) =>
-            val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match {
-              case Some(changelogPartitionId) => new Partition(changelogPartitionId)
-              case _ =>
-                // If we've never seen this TaskName before, then assign it a 
-                // new changelog.
-                maxChangelogPartitionId += 1
-                info("New task %s is being assigned changelog partition %s." format (taskName, maxChangelogPartitionId))
-                new Partition(maxChangelogPartitionId)
-            }
-            new TaskModel(taskName, systemStreamPartitions, changelogPartition)
-        }
-        .toSet
+    else
+    {
+      new util.HashMap[TaskName, java.lang.Integer]()
     }
+    checkpointManager.start
+    groups.foreach(taskSSP => checkpointManager.register(taskSSP._1))
+
+    // Generate the jobModel
+    def jobModelGenerator(): JobModel = refreshJobModel(config,
+                                                        allSystemStreamPartitions,
+                                                        checkpointManager,
+                                                        groups,
+                                                        previousChangelogeMapping,
+                                                        containerCount)
+
+    val jobModel = jobModelGenerator()
 
-    // Save the changelog mapping back to the checkpoint manager.
-    if (checkpointManager != null) {
-      // newChangelogMapping is the merging of all current task:changelog 
+    // Save the changelog mapping back to the ChangelogPartitionmanager
+    if (changelogManager != null)
+    {
+      // newChangelogMapping is the merging of all current task:changelog
       // assignments with whatever we had before (previousChangelogeMapping).
-      // We must persist legacy changelog assignments so that 
-      // maxChangelogPartitionId always has the absolute max, not the current 
-      // max (in case the task with the highest changelog partition mapping 
+      // We must persist legacy changelog assignments so that
+      // maxChangelogPartitionId always has the absolute max, not the current
+      // max (in case the task with the highest changelog partition mapping
       // disappears.
-      val newChangelogMapping = taskModels.map(taskModel => {
-        taskModel.getTaskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
-      }).toMap ++ previousChangelogeMapping
+      val newChangelogMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
+                                                 taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
+                                               }}.toMap ++ previousChangelogeMapping
       info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
-      checkpointManager.writeChangeLogPartitionMapping(newChangelogMapping)
-      checkpointManager.stop
+      changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
     }
+    // Return a jobModelGenerator lambda that can be used to refresh the job model
+    jobModelGenerator
+  }
 
-    // Here is where we should put in a pluggable option for the 
-    // SSPTaskNameGrouper for locality, load-balancing, etc.
-    val containerGrouper = new GroupByContainerCount(containerCount)
-    val containerModels = containerGrouper
-      .group(taskModels)
-      .map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }
-      .toMap
+  /**
+   * Build a full Samza job model. The function reads the latest checkpoint from the underlying coordinator stream and
+   * builds a new JobModel.
+   * This method needs to be thread safe, the reason being, for every HTTP request from a container, this method is called
+   * and underlying it uses the same instance of coordinator stream producer and coordinator stream consumer.
+   */
+  private def refreshJobModel(config: Config,
+                              allSystemStreamPartitions: util.Set[SystemStreamPartition],
+                              checkpointManager: CheckpointManager,
+                              groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
+                              previousChangelogeMapping: util.Map[TaskName, Integer],
+                              containerCount: Int): JobModel = {
+    this.synchronized
+    {
+      // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
+      // mapping.
+      var maxChangelogPartitionId = previousChangelogeMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+
+      // Assign all SystemStreamPartitions to TaskNames.
+      val taskModels =
+      {
+        groups.map
+                { case (taskName, systemStreamPartitions) =>
+                  val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
+                  // Find the system partitions which don't have a checkpoint and set null for the values for offsets
+                  val offsetMap = systemStreamPartitions.map(ssp => (ssp -> null)).toMap ++ checkpoint.getOffsets
+                  val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match
+                  {
+                    case Some(changelogPartitionId) => new Partition(changelogPartitionId)
+                    case _ =>
+                      // If we've never seen this TaskName before, then assign it a
+                      // new changelog.
+                      maxChangelogPartitionId += 1
+                      info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
+                      new Partition(maxChangelogPartitionId)
+                  }
+                  new TaskModel(taskName, offsetMap, changelogPartition)
+                }.toSet
+      }
+
+      // Here is where we should put in a pluggable option for the
+      // SSPTaskNameGrouper for locality, load-balancing, etc.
+      val containerGrouper = new GroupByContainerCount(containerCount)
+      val containerModels = containerGrouper.group(taskModels).map
+              { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
 
-    new JobModel(config, containerModels)
+      new JobModel(config, containerModels)
+    }
   }
 }
 
@@ -207,19 +281,31 @@ class JobCoordinator(
   /**
    * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
    */
-  val server: HttpServer) extends Logging {
+  val server: HttpServer = null,
+
+  /**
+   * Handle to checkpoint manager that's used to refresh the JobModel
+   */
+  val checkpointManager: CheckpointManager) extends Logging {
 
   debug("Got job model: %s." format jobModel)
 
   def start {
-    debug("Starting HTTP server.")
-    server.start
-    info("Startd HTTP server: %s" format server.getUrl)
+    if (server != null) {
+      debug("Starting HTTP server.")
+      server.start
+      info("Startd HTTP server: %s" format server.getUrl)
+    }
   }
 
   def stop {
-    debug("Stopping HTTP server.")
-    server.stop
-    info("Stopped HTTP server.")
+    if (server != null) {
+      debug("Stopping HTTP server.")
+      server.stop
+      info("Stopped HTTP server.")
+      debug("Stopping checkpoint manager.")
+      checkpointManager.stop()
+      info("Stopped checkpoint manager.")
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
index 10986a4..dfe3a45 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
@@ -20,8 +20,6 @@
 package org.apache.samza.coordinator.server;
 
 import java.net.InetAddress
-import java.net.URI
-import java.net.UnknownHostException
 import javax.servlet.Servlet
 import org.apache.samza.SamzaException
 import org.eclipse.jetty.server.Connector
@@ -32,6 +30,7 @@ import org.eclipse.jetty.servlet.ServletHolder
 import java.net.URL
 import org.apache.samza.util.Logging
 
+
 /**
  * <p>A Jetty-based HTTP server. The server allows arbitrary servlets to be added
  * with the addServlet() method. The server is configured to automatically

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index 635c353..a3baddb 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -19,12 +19,13 @@
 
 package org.apache.samza.coordinator.server
 
+
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.util.Logging
 
 /**
  * A servlet that dumps the job model for a Samza job.
  */
-class JobServlet(jobModel: JobModel) extends ServletBase with Logging {
-  protected def getObjectToWrite() = jobModel
-}
+class JobServlet(jobModelGenerator: () => JobModel) extends ServletBase with Logging {
+  protected def getObjectToWrite() = jobModelGenerator()
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
new file mode 100644
index 0000000..9283812
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.{Config, SystemConfig}
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.{SystemFactory, SystemStream}
+import org.apache.samza.util.Util
+
+/**
+ * A helper class that does wiring for CoordinatorStreamSystemConsumer and
+ * CoordinatorStreamSystemProducer. This factory should only be used in
+ * situations where the underlying SystemConsumer/SystemProducer does not
+ * exist.
+ */
+class CoordinatorStreamSystemFactory {
+  def getCoordinatorStreamSystemConsumer(config: Config, registry: MetricsRegistry) = {
+    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+    val systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem, config, registry)
+    new CoordinatorStreamSystemConsumer(coordinatorSystemStream, systemConsumer, systemAdmin)
+  }
+
+  def getCoordinatorStreamSystemProducer(config: Config, registry: MetricsRegistry) = {
+    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+    val systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem, config, registry)
+    new CoordinatorStreamSystemProducer(coordinatorSystemStream, systemProducer, systemAdmin)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 0b720ec..1c178a6 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,16 +20,27 @@
 package org.apache.samza.job
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, ConfigRewriter}
+import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.factories.PropertiesConfigFactory
 import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.util.Util
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.SystemConfig
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+
+object JobRunner {
+  val SOURCE = "job-runner"
 
-object JobRunner extends Logging {
   def main(args: Array[String]) {
     val cmdline = new CommandLine
     val options = cmdline.parser.parse(args: _*)
@@ -43,23 +54,43 @@ object JobRunner extends Logging {
  * on a config URI. The configFactory is instantiated, fed the configPath,
  * and returns a Config, which is used to execute the job.
  */
-class JobRunner(config: Config) extends Logging with Runnable {
-
-  def run() {
-    val conf = rewriteConfig(config)
-
-    val jobFactoryClass = conf.getStreamJobFactoryClass match {
+class JobRunner(config: Config) extends Logging {
+  def run() = {
+    debug("config: %s" format (config))
+    val jobFactoryClass = config.getStreamJobFactoryClass match {
       case Some(factoryClass) => factoryClass
       case _ => throw new SamzaException("no job factory class defined")
     }
-
     val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
-
     info("job factory: %s" format (jobFactoryClass))
-    debug("config: %s" format (conf))
+    val factory = new CoordinatorStreamSystemFactory
+    val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+    val coordinatorSystemProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+
+    // Create the coordinator stream if it doesn't exist
+    info("Creating coordinator stream");
+    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+    systemAdmin.createCoordinatorStream(coordinatorSystemStream.getStream)
+
+    info("Storing config in coordinator stream.")
+    coordinatorSystemProducer.register(JobRunner.SOURCE)
+    coordinatorSystemProducer.start
+    coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
+    info("Loading old config from coordinator stream.")
+    coordinatorSystemConsumer.register
+    coordinatorSystemConsumer.start
+    coordinatorSystemConsumer.bootstrap
+    coordinatorSystemConsumer.stop
+    val oldConfig = coordinatorSystemConsumer.getConfig();
+    info("Deleting old configs that are no longer defined: %s".format(oldConfig.keySet -- config.keySet))
+    (oldConfig.keySet -- config.keySet).foreach(key => {
+      coordinatorSystemProducer.send(new CoordinatorStreamMessage.Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
+    })
+    coordinatorSystemProducer.stop
 
     // Create the actual job, and submit it.
-    val job = jobFactory.getJob(conf).submit
+    val job = jobFactory.getJob(config).submit
 
     info("waiting for job to start")
 
@@ -76,22 +107,6 @@ class JobRunner(config: Config) extends Logging with Runnable {
     }
 
     info("exiting")
-  }
-
-  // Apply any and all config re-writer classes that the user has specified
-  def rewriteConfig(config: Config): Config = {
-    def rewrite(c: Config, rewriterName: String): Config = {
-      val klass = config
-        .getConfigRewriterClass(rewriterName)
-        .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
-      val rewriter = Util.getObj[ConfigRewriter](klass)
-      info("Re-writing config file with " + rewriter)
-      rewriter.rewrite(rewriterName, c)
-    }
-
-    config.getConfigRewriters match {
-      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
-      case None => config
-    }
+    job
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index fd9719a..4fac154 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -30,7 +30,7 @@ import org.apache.samza.util.{Logging, Util}
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
   def   getJob(config: Config): StreamJob = {
-    val coordinator = JobCoordinator(config, 1)
+    val coordinator = JobCoordinator(config)
     val containerModel = coordinator.jobModel.getContainers.get(0)
 
     val commandBuilder = {
@@ -48,11 +48,12 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
     }
     // JobCoordinator is stopped by ProcessJob when it exits
     coordinator.start
+    val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config)
 
     commandBuilder
-      .setConfig(config)
-      .setId(0)
-      .setUrl(coordinator.server.getUrl)
+            .setConfig(coordinatorSystemConfig)
+            .setId(0)
+            .setUrl(coordinator.server.getUrl)
 
     new ProcessJob(commandBuilder, coordinator)
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 530255e..60ee36f 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.job.local
 
+
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.util.Logging
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
@@ -26,7 +28,6 @@ import org.apache.samza.config.ShellCommandConfig._
 import org.apache.samza.config.TaskConfig._
 import org.apache.samza.container.SamzaContainer
 import org.apache.samza.job.{ StreamJob, StreamJobFactory }
-import org.apache.samza.util.Util
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.coordinator.JobCoordinator
 
@@ -36,7 +37,7 @@ import org.apache.samza.coordinator.JobCoordinator
 class ThreadJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
-    val coordinator = JobCoordinator(config, 1)
+    val coordinator = JobCoordinator(config)
     val containerModel = coordinator.jobModel.getContainers.get(0)
 
     // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
index 744eec0..4b658c1 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
@@ -18,19 +18,33 @@
  */
 
 package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.codehaus.jackson.`type`.TypeReference
 import org.codehaus.jackson.map.ObjectMapper
-import java.nio.ByteBuffer
 import org.apache.samza.config.Config
 
-class JsonSerde extends Serde[Object] {
-  val objectMapper = new ObjectMapper
+class JsonSerde[T] extends Serde[T] {
+  val mapper = SamzaObjectMapper.getObjectMapper()
+
+  def toBytes(obj: T): Array[Byte] = {
+    try {
+      mapper.writeValueAsString(obj).getBytes("UTF-8")
+    }
+    catch {
+      case e: Exception => throw new SamzaException(e);
+    }
+  }
 
-  def toBytes(obj: Object) = objectMapper
-    .writeValueAsString(obj)
-    .getBytes("UTF-8")
+  def fromBytes(bytes: Array[Byte]): T = {
+     try {
+         mapper.readValue(new String(bytes, "UTF-8"), new TypeReference[T]() {})}
+     catch {
+       case e: Exception => throw new SamzaException(e);
+     }
+  }
 
-  def fromBytes(bytes: Array[Byte]) = objectMapper
-    .readValue(bytes, classOf[Object])
 }
 
 class JsonSerdeFactory extends SerdeFactory[Object] {

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
index ec1d749..097f410 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -131,7 +131,11 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
     enterPosition
   }
 
-  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-    throw new SamzaException("Method not implemented")
+  def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
+    throw new UnsupportedOperationException("Method not implemented.")
+  }
+
+  def createCoordinatorStream(streamName: String) {
+    throw new UnsupportedOperationException("Method not implemented.")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 1a67586..8a83566 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,14 +19,20 @@
 
 package org.apache.samza.util
 
-import java.net.URL
-import java.io.BufferedReader
+import java.net.{HttpURLConnection, URL}
+import java.io.{InputStream, BufferedReader, File, InputStreamReader}
 import java.lang.management.ManagementFactory
-import java.io.File
-import org.apache.samza.system.SystemStream
+import org.apache.samza.{SamzaException, Partition}
+import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
 import java.util.Random
-import org.apache.samza.job.model.JobModel
-import java.io.InputStreamReader
+import org.apache.samza.config.Config
+import org.apache.samza.config.SystemConfig
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.MapConfig
+import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig
 
 object Util extends Logging {
   val random = new Random
@@ -114,14 +120,121 @@ object Util extends Logging {
    * @param timeout How long to wait before timing out when connecting to or reading from the HTTP server.
    * @return String payload of the body of the HTTP response.
    */
-  def read(url: URL, timeout: Int = 30000): String = {
-    val conn = url.openConnection();
+  def read(url: URL, timeout: Int = 60000): String = {
+    var httpConn = getHttpConnection(url, timeout)
+    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+    retryBackoff.run(loop => {
+      if(httpConn.getResponseCode != 200)
+      {
+        warn("Error: " + httpConn.getResponseCode)
+        val errorContent = readStream(httpConn.getErrorStream)
+        warn("Error reading stream, failed with response %s" format errorContent)
+        httpConn = getHttpConnection(url, timeout)
+      }
+      else
+      {
+        loop.done
+      }
+    },
+    (exception, loop) => {
+      exception match {
+        case e: Exception =>
+          loop.done
+          error("Unable to connect to Job coordinator server, received exception", e)
+          throw e
+      }
+    })
+
+    if(httpConn.getResponseCode != 200) {
+      throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
+    }
+    readStream(httpConn.getInputStream)
+  }
+
+  private def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
+    val conn = url.openConnection()
     conn.setConnectTimeout(timeout)
     conn.setReadTimeout(timeout)
-    val br = new BufferedReader(new InputStreamReader(conn.getInputStream));
+    conn.asInstanceOf[HttpURLConnection]
+  }
+  private def readStream(stream: InputStream): String = {
+    val br = new BufferedReader(new InputStreamReader(stream));
     var line: String = null;
     val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
     br.close
+    stream.close
     body
   }
+
+
+  /**
+   * Generates a coordinator stream name based off of the job name and job id
+   * for the jobd. The format is of the stream name will be
+   * __samza_coordinator_&lt;JOBNAME&gt;_&lt;JOBID&gt;.
+   */
+  def getCoordinatorStreamName(jobName: String, jobId: String) = {
+    "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+  }
+
+  /**
+   * Get a job's name and ID given a config. Job ID is defaulted to 1 if not
+   * defined in the config, and job name must be defined in config.
+   *
+   * @return A tuple of (jobName, jobId)
+   */
+  def getJobNameAndId(config: Config) = {
+    (config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), config.getJobId.getOrElse("1"))
+  }
+
+  /**
+   * Given a job's full config object, build a subset config which includes
+   * only the job name, job id, and system config for the coordinator stream.
+   */
+  def buildCoordinatorStreamConfig(config: Config) = {
+    val (jobName, jobId) = getJobNameAndId(config)
+    // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
+    new MapConfig(config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false) ++
+      Map[String, String](JobConfig.JOB_NAME -> jobName, JobConfig.JOB_ID -> jobId, JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName))
+  }
+
+  /**
+   * Get the Coordinator System and system factory from the configuration
+   * @param config
+   * @return
+   */
+  def getCoordinatorSystemStreamAndFactory(config: Config) = {
+    val systemName = config.getCoordinatorSystemName
+    val (jobName, jobId) = Util.getJobNameAndId(config)
+    val streamName = Util.getCoordinatorStreamName(jobName, jobId)
+    val coordinatorSystemStream = new SystemStream(systemName, streamName)
+    val systemFactoryClassName = config
+      .getSystemFactory(systemName)
+      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+    (coordinatorSystemStream, systemFactory)
+  }
+
+  /**
+   * The helper function converts a SSP to a string
+   * @param ssp System stream partition
+   * @return The string representation of the SSP
+   */
+  def sspToString(ssp: SystemStreamPartition): String = {
+     ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId())
+  }
+
+  /**
+   * The method converts the string SSP back to a SSP
+   * @param ssp The string form of the SSP
+   * @return An SSP typed object
+   */
+  def stringToSsp(ssp: String): SystemStreamPartition = {
+     val idx = ssp.indexOf('.');
+     val lastIdx = ssp.lastIndexOf('.')
+     if (idx < 0 || lastIdx < 0) {
+       throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition")
+     }
+     new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)),
+                               new Partition(Integer.parseInt(ssp.substring(lastIdx + 1))))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
new file mode 100644
index 0000000..59782fe
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.apache.samza.util.Util;
+
+/**
+ * Helper for creating mock CoordinatorStreamConsumer and
+ * CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just
+ * forward all configs to JobCoordinator, which is useful for mocking in unit
+ * tests.
+ */
+public class MockCoordinatorStreamSystemFactory implements SystemFactory {
+
+  private static SystemConsumer mockConsumer = null;
+  private static boolean useCachedConsumer = false;
+  public static void enableMockConsumerCache() {
+    mockConsumer = null;
+    useCachedConsumer = true;
+  }
+
+  public static void disableMockConsumerCache() {
+    useCachedConsumer = false;
+    mockConsumer = null;
+  }
+
+  /**
+   * Returns a consumer that sends all configs to the coordinator stream.
+   * @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
+   *               The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
+   *                                       ch:source:taskname -> changelogPartition for changelog
+   *               Everything else is processed as normal config
+   */
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+
+    if(useCachedConsumer && mockConsumer != null) {
+      return mockConsumer;
+    }
+
+    String jobName = config.get("job.name");
+    String jobId = config.get("job.id");
+    if (jobName == null) {
+      throw new ConfigException("Must define job.name.");
+    }
+    if (jobId == null) {
+      jobId = "1";
+    }
+    String streamName = Util.getCoordinatorStreamName(jobName, jobId);
+    SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamName, new Partition(0));
+    mockConsumer = new MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
+    return mockConsumer;
+  }
+
+  /**
+   * Returns a no-op producer.
+   */
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    // A do-nothing producer.
+    return new SystemProducer() {
+      public void start() {
+      }
+
+      public void stop() {
+      }
+
+      public void register(String source) {
+      }
+
+      public void send(String source, OutgoingMessageEnvelope envelope) {
+      }
+
+      public void flush(String source) {
+      }
+    };
+  }
+
+  /**
+   * Returns a single partition admin that pretends to create a coordinator
+   * stream.
+   */
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new MockSystemAdmin();
+  }
+
+  public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
+    public void createCoordinatorStream(String streamName) {
+      // Do nothing.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
new file mode 100644
index 0000000..00a2d59
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Util;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * A mock SystemConsumer that pretends to be a coordinator stream. The mock will
+ * take all configs given to it, and put them into the coordinator stream's
+ * SystemStreamPartition. This is useful in cases where config needs to be
+ * quickly passed from a unit test into the JobCoordinator.
+ */
+public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
+  private final static ObjectMapper MAPPER = SamzaObjectMapper.getObjectMapper();
+  public final static String CHANGELOGPREFIX = "ch:";
+  public final static String CHECKPOINTPREFIX = "cp:";
+  public final CountDownLatch blockConsumerPoll = new CountDownLatch(1);
+  public boolean blockpollFlag = false;
+
+  private final SystemStreamPartition systemStreamPartition;
+  private final Config config;
+
+  public MockCoordinatorStreamWrappedConsumer(SystemStreamPartition systemStreamPartition, Config config) {
+    super();
+    this.config = config;
+    this.systemStreamPartition = systemStreamPartition;
+  }
+
+  public void start() {
+    convertConfigToCoordinatorMessage(config);
+  }
+
+  public void addMoreMessages(Config config) {
+    convertConfigToCoordinatorMessage(config);
+  }
+
+
+  private void convertConfigToCoordinatorMessage(Config config) {
+    try {
+      for (Map.Entry<String, String> configPair : config.entrySet()) {
+        byte[] keyBytes = null;
+        byte[] messgeBytes = null;
+        if(configPair.getKey().startsWith(CHECKPOINTPREFIX))
+        {
+          String[] checkpointInfo = configPair.getKey().split(":");
+          String[] sspOffsetPair = configPair.getValue().split(":");
+          HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>();
+          checkpointMap.put(Util.stringToSsp(sspOffsetPair[0]), sspOffsetPair[1]);
+          Checkpoint cp = new Checkpoint(checkpointMap);
+          CoordinatorStreamMessage.SetCheckpoint setCheckpoint = new CoordinatorStreamMessage.SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
+          keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8");
+          messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8");
+        }
+        else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
+          String[] changelogInfo = configPair.getKey().split(":");
+          String changeLogPartition = configPair.getValue();
+          CoordinatorStreamMessage.SetChangelogMapping changelogMapping = new CoordinatorStreamMessage.SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
+          keyBytes = MAPPER.writeValueAsString(changelogMapping.getKeyArray()).getBytes("UTF-8");
+          messgeBytes = MAPPER.writeValueAsString(changelogMapping.getMessageMap()).getBytes("UTF-8");
+        }
+        else {
+          SetConfig setConfig = new SetConfig("source", configPair.getKey(), configPair.getValue());
+          keyBytes = MAPPER.writeValueAsString(setConfig.getKeyArray()).getBytes("UTF-8");
+          messgeBytes = MAPPER.writeValueAsString(setConfig.getMessageMap()).getBytes("UTF-8");
+        }
+        // The ssp here is the coordinator ssp (which is always fixed) and not the task ssp.
+        put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "", keyBytes, messgeBytes));
+      }
+      setIsAtHead(systemStreamPartition, true);
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+      throws InterruptedException {
+
+    if(blockpollFlag) {
+      blockConsumerPoll.await();
+    }
+
+    return super.poll(systemStreamPartitions, timeout);
+  }
+
+  public CountDownLatch blockPool()
+  {
+    blockpollFlag = true;
+    return blockConsumerPoll;
+  }
+
+
+  public void stop() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
new file mode 100644
index 0000000..15181bb
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.*;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.junit.Test;
+
+public class TestCoordinatorStreamMessage {
+  @Test
+  public void testCoordinatorStreamMessage() {
+    CoordinatorStreamMessage message = new CoordinatorStreamMessage("source");
+    assertEquals("source", message.getSource());
+    assertEquals(CoordinatorStreamMessage.VERSION, message.getVersion());
+    assertNotNull(message.getUsername());
+    assertTrue(message.getTimestamp() > 0);
+    assertTrue(!message.isDelete());
+    CoordinatorStreamMessage secondMessage = new CoordinatorStreamMessage(message.getKeyArray(), message.getMessageMap());
+    assertEquals(secondMessage, message);
+  }
+
+  @Test
+  public void testCoordinatorStreamMessageIsDelete() {
+    CoordinatorStreamMessage message = new CoordinatorStreamMessage(new Object[] {}, null);
+    assertTrue(message.isDelete());
+    assertNull(message.getMessageMap());
+  }
+
+  @Test
+  public void testSetConfig() {
+    SetConfig setConfig = new SetConfig("source", "key", "value");
+    assertEquals(SetConfig.TYPE, setConfig.getType());
+    assertEquals("key", setConfig.getKey());
+    assertEquals("value", setConfig.getConfigValue());
+    assertFalse(setConfig.isDelete());
+    assertEquals(CoordinatorStreamMessage.VERSION, setConfig.getVersion());
+  }
+
+  @Test
+  public void testDelete() {
+    Delete delete = new Delete("source2", "key", "delete-type");
+    assertEquals("delete-type", delete.getType());
+    assertEquals("key", delete.getKey());
+    assertNull(delete.getMessageMap());
+    assertTrue(delete.isDelete());
+    assertEquals(CoordinatorStreamMessage.VERSION, delete.getVersion());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
new file mode 100644
index 0000000..5e193f8
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.junit.Test;
+
+public class TestCoordinatorStreamSystemConsumer {
+  @Test
+  public void testCoordinatorStreamSystemConsumer() {
+    Map<String, String> expectedConfig = new HashMap<String, String>();
+    expectedConfig.put("job.id", "1234");
+    SystemStream systemStream = new SystemStream("system", "stream");
+    MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
+    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
+    assertFalse(systemConsumer.isRegistered());
+    consumer.register();
+    assertTrue(systemConsumer.isRegistered());
+    assertFalse(systemConsumer.isStarted());
+    consumer.start();
+    assertTrue(systemConsumer.isStarted());
+    try {
+      consumer.getConfig();
+      fail("Should have failed when retrieving config before bootstrapping.");
+    } catch (SamzaException e) {
+      // Expected.
+    }
+    consumer.bootstrap();
+    assertEquals(expectedConfig, consumer.getConfig());
+    assertFalse(systemConsumer.isStopped());
+    consumer.stop();
+    assertTrue(systemConsumer.isStopped());
+  }
+
+  private static class MockSystemConsumer implements SystemConsumer {
+    private boolean started = false;
+    private boolean stopped = false;
+    private boolean registered = false;
+    private final SystemStreamPartition expectedSystemStreamPartition;
+    private int pollCount = 0;
+
+    public MockSystemConsumer(SystemStreamPartition expectedSystemStreamPartition) {
+      this.expectedSystemStreamPartition = expectedSystemStreamPartition;
+    }
+
+    public void start() {
+      started = true;
+    }
+
+    public void stop() {
+      stopped = true;
+    }
+
+    public void register(SystemStreamPartition systemStreamPartition, String offset) {
+      registered = true;
+      assertEquals(expectedSystemStreamPartition, systemStreamPartition);
+    }
+
+    public boolean isRegistered() {
+      return registered;
+    }
+
+    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+      assertEquals(1, systemStreamPartitions.size());
+      SystemStreamPartition systemStreamPartition = systemStreamPartitions.iterator().next();
+      assertEquals(expectedSystemStreamPartition, systemStreamPartition);
+
+      if (pollCount++ == 0) {
+        List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
+        SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name");
+        SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
+        Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
+        list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
+        list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
+        list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap()));
+        map.put(systemStreamPartition, list);
+      }
+
+      return map;
+    }
+
+    private byte[] serialize(Object obj) {
+      try {
+        return SamzaObjectMapper.getObjectMapper().writeValueAsString(obj).getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    public boolean isStarted() {
+      return started;
+    }
+
+    public boolean isStopped() {
+      return stopped;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
new file mode 100644
index 0000000..728fa53
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.codehaus.jackson.type.TypeReference;
+import org.junit.Test;
+
+public class TestCoordinatorStreamSystemProducer {
+  @Test
+  public void testCoordinatorStreamSystemProducer() {
+    String source = "source";
+    SystemStream systemStream = new SystemStream("system", "stream");
+    MockSystemProducer systemProducer = new MockSystemProducer(source);
+    MockSystemAdmin systemAdmin = new MockSystemAdmin();
+    CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
+    SetConfig setConfig1 = new SetConfig(source, "job.name", "my-job-name");
+    SetConfig setConfig2 = new SetConfig(source, "job.id", "1234");
+    Delete delete = new Delete(source, "job.name", SetConfig.TYPE);
+    assertFalse(systemProducer.isRegistered());
+    producer.register(source);
+    assertTrue(systemProducer.isRegistered());
+    assertFalse(systemProducer.isStarted());
+    producer.start();
+    assertTrue(systemProducer.isStarted());
+    producer.send(setConfig1);
+    producer.send(setConfig2);
+    producer.send(delete);
+    assertFalse(systemProducer.isStopped());
+    producer.stop();
+    assertTrue(systemProducer.isStopped());
+    List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes();
+    OutgoingMessageEnvelope envelope0 = envelopes.get(0);
+    OutgoingMessageEnvelope envelope1 = envelopes.get(1);
+    OutgoingMessageEnvelope envelope2 = envelopes.get(2);
+    TypeReference<Object[]> keyRef = new TypeReference<Object[]>() {
+    };
+    TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() {
+    };
+    assertEquals(3, envelopes.size());
+    assertEquals(new CoordinatorStreamMessage(setConfig1), new CoordinatorStreamMessage(deserialize((byte[]) envelope0.getKey(), keyRef), deserialize((byte[]) envelope0.getMessage(), msgRef)));
+    assertEquals(new CoordinatorStreamMessage(setConfig2), new CoordinatorStreamMessage(deserialize((byte[]) envelope1.getKey(), keyRef), deserialize((byte[]) envelope1.getMessage(), msgRef)));
+    assertEquals(new CoordinatorStreamMessage(delete), new CoordinatorStreamMessage(deserialize((byte[]) envelope2.getKey(), keyRef), deserialize((byte[]) envelope2.getMessage(), msgRef)));
+  }
+
+  private <T> T deserialize(byte[] bytes, TypeReference<T> reference) {
+    try {
+      if (bytes != null) {
+        String valueStr = new String((byte[]) bytes, "UTF-8");
+        return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference);
+      }
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+
+    return null;
+  }
+
+  private static class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
+    public void createCoordinatorStream(String streamName) {
+      // Do nothing.
+    }
+  }
+
+  private static class MockSystemProducer implements SystemProducer {
+    private final String expectedSource;
+    private final List<OutgoingMessageEnvelope> envelopes;
+    private boolean started = false;
+    private boolean stopped = false;
+    private boolean registered = false;
+    private boolean flushed = false;
+
+    public MockSystemProducer(String expectedSource) {
+      this.expectedSource = expectedSource;
+      this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+    }
+
+    public void start() {
+      started = true;
+    }
+
+    public void stop() {
+      stopped = true;
+    }
+
+    public void register(String source) {
+      assertEquals(expectedSource, source);
+      registered = true;
+    }
+
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      envelopes.add(envelope);
+    }
+
+    public void flush(String source) {
+      assertEquals(expectedSource, source);
+      flushed = true;
+    }
+
+    public List<OutgoingMessageEnvelope> getEnvelopes() {
+      return envelopes;
+    }
+
+    public boolean isStarted() {
+      return started;
+    }
+
+    public boolean isStopped() {
+      return stopped;
+    }
+
+    public boolean isRegistered() {
+      return registered;
+    }
+
+    public boolean isFlushed() {
+      return flushed;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 76bc681..72b134c 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -40,12 +40,12 @@ public class TestSamzaObjectMapper {
   public void testJsonTaskModel() throws Exception {
     ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
     Map<String, String> configMap = new HashMap<String, String>();
+    Map<SystemStreamPartition, String> sspOffset = new HashMap<SystemStreamPartition, String>();
     configMap.put("a", "b");
     Config config = new MapConfig(configMap);
-    Set<SystemStreamPartition> inputSystemStreamPartitions = new HashSet<SystemStreamPartition>();
-    inputSystemStreamPartitions.add(new SystemStreamPartition("foo", "bar", new Partition(1)));
     TaskName taskName = new TaskName("test");
-    TaskModel taskModel = new TaskModel(taskName, inputSystemStreamPartitions, new Partition(2));
+    sspOffset.put(new SystemStreamPartition("foo", "bar", new Partition(1)), "");
+    TaskModel taskModel = new TaskModel(taskName, sspOffset, new Partition(2));
     Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
     tasks.put(taskName, taskModel);
     ContainerModel containerModel = new ContainerModel(1, tasks);

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/resources/test.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test.properties b/samza-core/src/test/resources/test.properties
index 9348c7d..41eb82e 100644
--- a/samza-core/src/test/resources/test.properties
+++ b/samza-core/src/test/resources/test.properties
@@ -22,3 +22,4 @@
 job.factory.class=org.apache.samza.job.MockJobFactory
 job.name=test-job
 foo=bar
+systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index af800df..0ba932c 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -31,8 +31,9 @@ import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mock.MockitoSugar
-
 import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 
 object TestCheckpointTool {
   var checkpointManager: CheckpointManager = null
@@ -40,8 +41,8 @@ object TestCheckpointTool {
   var systemProducer: SystemProducer = null
   var systemAdmin: SystemAdmin = null
 
-  class MockCheckpointManagerFactory extends CheckpointManagerFactory {
-    override def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
+  class MockCheckpointManagerFactory {
+    def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
   }
 
   class MockSystemFactory extends SystemFactory {
@@ -62,15 +63,17 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
   @Before
   def setup {
     config = new MapConfig(Map(
+      JobConfig.JOB_NAME -> "test",
+      JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
       TaskConfig.INPUT_STREAMS -> "test.foo",
       TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
-      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName
+      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
     ))
     val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
       new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
       new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
     ))
-
     TestCheckpointTool.checkpointManager = mock[CheckpointManager]
     TestCheckpointTool.systemAdmin = mock[SystemAdmin]
     when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo")))
@@ -79,12 +82,12 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
       .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234")))
     when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
       .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "4321")))
-
   }
 
   @Test
   def testReadLatestCheckpoint {
-    new CheckpointTool(config, null).run
+    val checkpointTool = new CheckpointTool(config, null, TestCheckpointTool.checkpointManager)
+    checkpointTool.run
     verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0)
     verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1)
     verify(TestCheckpointTool.checkpointManager, never()).writeCheckpoint(any(), any())
@@ -95,7 +98,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
     val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
       tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
 
-    new CheckpointTool(config, toOverwrite).run
+    val checkpointTool = new CheckpointTool(config, toOverwrite, TestCheckpointTool.checkpointManager)
+    checkpointTool.run
     verify(TestCheckpointTool.checkpointManager)
       .writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42")))
     verify(TestCheckpointTool.checkpointManager)

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index a281e79..8d54c46 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -65,7 +65,7 @@ class TestOffsetManager {
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
@@ -97,7 +97,7 @@ class TestOffsetManager {
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
@@ -179,22 +179,6 @@ class TestOffsetManager {
     }
   }
 
-  @Ignore("OffsetManager.start is supposed to throw an exception - but it doesn't") @Test
-  def testShouldFailWhenMissingDefault {
-    val taskName = new TaskName("c")
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]()))
-    offsetManager.register(taskName, Set(systemStreamPartition))
-
-    intercept[SamzaException] {
-      offsetManager.start
-    }
-  }
-
   @Test
   def testDefaultSystemShouldFailWhenFailIsSpecified {
     val systemStream = new SystemStream("test-system", "test-stream")
@@ -255,24 +239,26 @@ class TestOffsetManager {
     assertEquals(Some("13"), offsetManager.getStartingOffset(ssp))
   }
 
+
   private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
     val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
-
-    new CheckpointManager {
+    new CheckpointManager(null, null, null) {
       var isStarted = false
       var isStopped = false
       var registered = Set[TaskName]()
       var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint)
       var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
-      def start { isStarted = true }
-      def register(taskName: TaskName) { registered += taskName }
-      def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
-      def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
-      def stop { isStopped = true }
-
-      override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping
+      override def start { isStarted = true }
+      override def register(taskName: TaskName) { registered += taskName }
+      override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
+      override def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
+      override def stop { isStopped = true }
 
-      override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping
+      // Only for testing purposes - not present in actual checkpoint manager
+      def getOffets: util.Map[SystemStreamPartition, String] =
+      {
+        checkpoint.getOffsets()
+      }
     }
   }
 
@@ -284,8 +270,12 @@ class TestOffsetManager {
       def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
         Map[String, SystemStreamMetadata]()
 
-      override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-        new SamzaException("Method not implemented")
+      override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+        new UnsupportedOperationException("Method not implemented.")
+      }
+
+      override def createCoordinatorStream(streamName: String) {
+        new UnsupportedOperationException("Method not implemented.")
       }
     }
   }