You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/02/16 22:01:12 UTC

[1/2] samza git commit: SAMZA-1555: Move creation of checkpoint and changelog streams to the Job Coordinators

Repository: samza
Updated Branches:
  refs/heads/master 718c66612 -> 6dc89e100


http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 4a804dd..a68fb6f 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -27,6 +27,7 @@ import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.Config
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import org.apache.samza.container.grouper.task.BalancingTaskNameGrouper
 import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
@@ -34,19 +35,15 @@ import org.apache.samza.container.LocalityManager
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.storage.ChangelogPartitionManager
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
-import org.apache.samza.{Partition, PartitionChangeException, SamzaException}
+import org.apache.samza.Partition
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 
 /**
  * Helper companion object that is responsible for wiring up a JobModelManager
@@ -64,80 +61,32 @@ object JobModelManager extends Logging {
   /**
    * Does the following actions for a job.
    * a) Reads the jobModel from coordinator stream using the job's configuration.
-   * b) Creates changeLogStream for task stores if it does not exists.
-   * c) Recomputes changelog partition mapping based on jobModel and job's configuration
-   * and writes it to the coordinator stream.
-   * d) Builds JobModelManager using the jobModel read from coordinator stream.
-   * @param coordinatorSystemConfig A config object that contains job.name
-   *                                job.id, and all system.<job-coordinator-system-name>.*Ch
-   *                                configuration. The method will use this config to read all configuration
-   *                                from the coordinator stream, and instantiate a JobModelManager.
+   * b) Recomputes changelog partition mapping based on jobModel and job's configuration.
+   * c) Builds JobModelManager using the jobModel read from coordinator stream.
+   * @param coordinatorStreamManager Coordinator stream manager.
+   * @param changelogPartitionMapping The changelog partition-to-task mapping.
+   * @return JobModelManager
    */
-  def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = {
-    val coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = new CoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
-    val coordinatorSystemProducer: CoordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
-    info("Registering coordinator system stream consumer.")
-    coordinatorSystemConsumer.register
-    debug("Starting coordinator system stream consumer.")
-    coordinatorSystemConsumer.start
-    debug("Bootstrapping coordinator system stream consumer.")
-    coordinatorSystemConsumer.bootstrap
-    info("Registering coordinator system stream producer.")
-    coordinatorSystemProducer.register(SOURCE)
-
-    val config = coordinatorSystemConsumer.getConfig
-    info("Got config: %s" format config)
-    val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE)
-    changelogManager.start()
-
-    val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
-    // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
-    // TODO: This code will go away with refactoring - SAMZA-678
-
-    localityManager.start()
-
-    // Map the name of each system to the corresponding SystemAdmin
+  def apply(coordinatorStreamManager: CoordinatorStreamManager, changelogPartitionMapping: util.Map[TaskName, Integer]) = {
+    val localityManager = new LocalityManager(coordinatorStreamManager)
+
+    val config = coordinatorStreamManager.getConfig
+
+      // Map the name of each system to the corresponding SystemAdmin
     val systemAdmins = new SystemAdmins(config)
     val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
-    val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping()
 
-    val processorList = new ListBuffer[String]()
     val containerCount = new JobConfig(config).getContainerCount
-    for (i <- 0 until containerCount) {
-      processorList += i.toString
-    }
-    systemAdmins.start()
-    val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, processorList.toList.asJava)
-    val jobModel = jobModelManager.jobModel
-    // Save the changelog mapping back to the ChangelogPartitionmanager
-    // newChangelogPartitionMapping is the merging of all current task:changelog
-    // assignments with whatever we had before (previousChangelogPartitionMapping).
-    // We must persist legacy changelog assignments so that
-    // maxChangelogPartitionId always has the absolute max, not the current
-    // max (in case the task with the highest changelog partition mapping
-    // disappears.
-    val newChangelogPartitionMapping = jobModel.getContainers.asScala.flatMap(_._2.getTasks.asScala).map{case (taskName,taskModel) => {
-      taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
-    }}.toMap ++ previousChangelogPartitionMapping.asScala
-    info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping)
-    changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava)
-
-    createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions, systemAdmins)
-    createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions, systemAdmins)
+    val processorList = List.range(0, containerCount).map(c => c.toString)
 
+    systemAdmins.start()
+    val jobModelManager = getJobModelManager(config, changelogPartitionMapping, localityManager, streamMetadataCache, processorList.asJava)
     systemAdmins.stop()
+
     jobModelManager
   }
 
   /**
-    * This method creates a {@link JobModelManager} object w/o {@link StreamPartitionCountMonitor}
-    *
-    * @param coordinatorSystemConfig configuration for coordinator system
-    * @return a JobModelManager object
-    */
-  def apply(coordinatorSystemConfig: Config): JobModelManager = apply(coordinatorSystemConfig, new MetricsRegistryMap())
-
-  /**
    * Build a JobModelManager using a Samza job's configuration.
    */
   private def getJobModelManager(config: Config,
@@ -268,44 +217,6 @@ object JobModelManager extends Logging {
     }
   }
 
-  def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int, systemAdmins: SystemAdmins) {
-    val changeLogSystemStreams = config
-      .getStoreNames
-      .filter(config.getChangelogStream(_).isDefined)
-      .map(name => (name, config.getChangelogStream(name).get)).toMap
-      .mapValues(Util.getSystemStreamFromNames(_))
-
-    for ((storeName, systemStream) <- changeLogSystemStreams) {
-      val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
-
-      val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogPartitions)
-      if (systemAdmin.createStream(changelogSpec)) {
-        info("Created changelog stream %s." format systemStream.getStream)
-      } else {
-        info("Changelog stream %s already exists." format systemStream.getStream)
-      }
-      systemAdmin.validateStream(changelogSpec)
-    }
-  }
-
-  private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int, systemAdmins: SystemAdmins): Unit = {
-    val changeLogSystemStreams = config
-      .getStoreNames
-      .filter(config.getChangelogStream(_).isDefined)
-      .map(name => (name, config.getChangelogStream(name).get)).toMap
-      .mapValues(Util.getSystemStreamFromNames(_))
-
-    for ((storeName, systemStream) <- changeLogSystemStreams) {
-      val accessLog = config.getAccessLogEnabled(storeName)
-      if (accessLog) {
-        val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
-        val accessLogSpec = new StreamSpec(config.getAccessLogStream(systemStream.getStream),
-          config.getAccessLogStream(systemStream.getStream), systemStream.getSystem, changeLogPartitions)
-        systemAdmin.createStream(accessLogSpec)
-      }
-    }
-  }
-
   private def getSystemNames(config: Config) = config.getSystemNames.toSet
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 7a31567..fe679d3 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -19,28 +19,45 @@
 
 package org.apache.samza.job.local
 
-
-import java.io.File
-
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{JobConfig, Config}
+import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
 import org.apache.samza.config.TaskConfig._
 import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.util.{Logging, Util}
 
 /**
  * Creates a stand alone ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def  getJob(config: Config): StreamJob = {
+  def getJob(config: Config): StreamJob = {
     val containerCount = JobConfig.Config2Job(config).getContainerCount
 
     if (containerCount > 1) {
       throw new SamzaException("Container count larger than 1 is not supported for ProcessJobFactory")
     }
-    
-    val coordinator = JobModelManager(config)
+
+    val metricsRegistry = new MetricsRegistryMap()
+    val coordinatorStreamManager = new CoordinatorStreamManager(config, metricsRegistry)
+    coordinatorStreamManager.register(getClass.getSimpleName)
+    coordinatorStreamManager.start
+    coordinatorStreamManager.bootstrap
+    val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager)
+
+    val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
+    val jobModel = coordinator.jobModel
+    changelogStreamManager.writePartitionMapping(jobModel.getTaskPartitionMappings)
+
+    //create necessary checkpoint and changelog streams
+    val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
+    if (checkpointManager != null) {
+      checkpointManager.createResources()
+    }
+    ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions)
+
     val containerModel = coordinator.jobModel.getContainers.get(0)
 
     val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is configured

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 6de4ce0..5ab4827 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,14 +19,16 @@
 
 package org.apache.samza.job.local
 
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, TaskConfigJava}
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.container.{SamzaContainerListener, SamzaContainer}
+import org.apache.samza.container.{SamzaContainer, SamzaContainerListener}
 import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
-import org.apache.samza.metrics.{JmxServer, MetricsReporter}
+import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.runtime.LocalContainerRunner
+import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.task.TaskFactoryUtil
 import org.apache.samza.util.Logging
 
@@ -36,8 +38,25 @@ import org.apache.samza.util.Logging
 class ThreadJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
-    val coordinator = JobModelManager(config)
+
+    val metricsRegistry = new MetricsRegistryMap()
+    val coordinatorStreamManager = new CoordinatorStreamManager(config, metricsRegistry)
+    coordinatorStreamManager.register(getClass.getSimpleName)
+    coordinatorStreamManager.start
+    coordinatorStreamManager.bootstrap
+    val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager)
+
+    val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
     val jobModel = coordinator.jobModel
+    changelogStreamManager.writePartitionMapping(jobModel.getTaskPartitionMappings)
+
+    //create necessary checkpoint and changelog streams
+    val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
+    if (checkpointManager != null) {
+      checkpointManager.createResources()
+    }
+    ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions)
+
     val containerId = "0"
     val jmxServer = new JmxServer
     val streamApp = TaskFactoryUtil.createStreamApplication(config)
@@ -63,6 +82,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
 
       }
     }
+
     try {
       coordinator.start
       val container = SamzaContainer(

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
index 07f721d..051ff13 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -21,6 +21,7 @@ package org.apache.samza.container;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
@@ -66,21 +67,15 @@ public class TestLocalityManager {
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
     MockCoordinatorStreamSystemConsumer consumer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
-    LocalityManager localityManager = new LocalityManager(producer, consumer);
+    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(producer, consumer);
+    LocalityManager localityManager = new LocalityManager(coordinatorStreamManager);
 
-    try {
-      localityManager.register(new TaskName("task-0"));
-      fail("Should have thrown UnsupportedOperationException");
-    } catch (UnsupportedOperationException uoe) {
-      // expected
-    }
-
-    localityManager.register("containerId-0");
+    coordinatorStreamManager.register("SamzaContainer-containerId-0");
     assertTrue(producer.isRegistered());
     assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-0");
     assertTrue(consumer.isRegistered());
 
-    localityManager.start();
+    coordinatorStreamManager.start();
     assertTrue(producer.isStarted());
     assertTrue(consumer.isStarted());
 
@@ -101,7 +96,7 @@ public class TestLocalityManager {
       };
     assertEquals(expectedMap, localMap);
 
-    localityManager.stop();
+    coordinatorStreamManager.stop();
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }
@@ -109,13 +104,14 @@ public class TestLocalityManager {
   @Test public void testWriteOnlyLocalityManager() {
     MockCoordinatorStreamSystemProducer producer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
-    LocalityManager localityManager = new LocalityManager(producer);
+    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(producer);
+    LocalityManager localityManager = new LocalityManager(coordinatorStreamManager);
 
-    localityManager.register("containerId-1");
+    coordinatorStreamManager.register("SamzaContainer-containerId-1");
     assertTrue(producer.isRegistered());
     assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-1");
 
-    localityManager.start();
+    coordinatorStreamManager.start();
     assertTrue(producer.isStarted());
 
     localityManager.writeContainerToHostMapping("1", "localhost", "jmx:localhost:8181", "jmx:tunnel:localhost:9191");
@@ -134,7 +130,7 @@ public class TestLocalityManager {
             "jmx:tunnel:localhost:9191");
     assertEquals(expectedContainerMap, coordinatorStreamMessage);
 
-    localityManager.stop();
+    coordinatorStreamManager.stop();
     assertTrue(producer.isStopped());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 1b5c904..7b57a8b 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
@@ -60,12 +61,14 @@ public class TestTaskAssignmentManager {
     MockCoordinatorStreamSystemConsumer consumer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
     consumer.register();
-    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
+    CoordinatorStreamManager
+        coordinatorStreamManager = new CoordinatorStreamManager(producer, consumer);
+    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager);
 
     assertTrue(producer.isRegistered());
     assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
 
-    taskAssignmentManager.start();
+    coordinatorStreamManager.start();
     assertTrue(producer.isStarted());
     assertTrue(consumer.isStarted());
 
@@ -88,7 +91,7 @@ public class TestTaskAssignmentManager {
 
     assertEquals(expectedMap, localMap);
 
-    taskAssignmentManager.stop();
+    coordinatorStreamManager.stop();
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }
@@ -101,12 +104,14 @@ public class TestTaskAssignmentManager {
     MockCoordinatorStreamSystemConsumer consumer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
     consumer.register();
-    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
+    CoordinatorStreamManager
+        coordinatorStreamManager = new CoordinatorStreamManager(producer, consumer);
+    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager);
 
     assertTrue(producer.isRegistered());
     assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
 
-    taskAssignmentManager.start();
+    coordinatorStreamManager.start();
     assertTrue(producer.isStarted());
     assertTrue(consumer.isStarted());
 
@@ -129,7 +134,7 @@ public class TestTaskAssignmentManager {
     Map<String, String> deletedMap = taskAssignmentManager.readTaskAssignment();
     assertTrue(deletedMap.isEmpty());
 
-    taskAssignmentManager.stop();
+    coordinatorStreamManager.stop();
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }
@@ -142,12 +147,14 @@ public class TestTaskAssignmentManager {
     MockCoordinatorStreamSystemConsumer consumer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
     consumer.register();
-    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
+    CoordinatorStreamManager
+        coordinatorStreamManager = new CoordinatorStreamManager(producer, consumer);
+    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager);
 
     assertTrue(producer.isRegistered());
     assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
 
-    taskAssignmentManager.start();
+    coordinatorStreamManager.start();
     assertTrue(producer.isStarted());
     assertTrue(consumer.isStarted());
 
@@ -156,7 +163,7 @@ public class TestTaskAssignmentManager {
 
     assertEquals(expectedMap, localMap);
 
-    taskAssignmentManager.stop();
+    coordinatorStreamManager.stop();
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 3b65a62..cf05b3b 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -42,9 +42,10 @@ import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.config.JobConfig
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamWrappedConsumer
+import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, MockCoordinatorStreamSystemFactory, MockCoordinatorStreamWrappedConsumer}
 import org.apache.samza.job.MockJobFactory
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.storage.ChangelogStreamManager
 import org.scalatest.{FlatSpec, PrivateMethodTester}
 
 import scala.collection.immutable
@@ -107,7 +108,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     // We want the mocksystemconsumer to use the same instance across runs
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 
-    val coordinator = JobModelManager(new MapConfig((config ++ otherConfigs).asJava))
+    val coordinator = getTestJobModelManager(new MapConfig((config ++ otherConfigs).asJava))
     val expectedJobModel = new JobModel(new MapConfig(config.asJava), containers.asJava)
 
     // Verify that the atomicReference is initialized
@@ -167,7 +168,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 
     // start the job coordinator and verify if it has all the checkpoints through http port
-    val coordinator = JobModelManager(new MapConfig((config ++ otherConfigs).asJava))
+    val coordinator = getTestJobModelManager(new MapConfig((config ++ otherConfigs).asJava))
     coordinator.start
     val url = coordinator.server.getUrl.toString
 
@@ -192,7 +193,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
   @Test
   def testWithPartitionAssignmentWithThreadJobFactory {
     val config = getTestConfig(classOf[ThreadJobFactory])
-    val coordinator = JobModelManager(config)
+    val coordinator = getTestJobModelManager(config)
 
     // Construct the expected JobModel, so we can compare it to
     // JobCoordinator's JobModel.
@@ -213,7 +214,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
   @Test
   def testWithPartitionAssignmentWithProcessJobFactory {
     val config = getTestConfig(classOf[ProcessJobFactory])
-    val coordinator = JobModelManager(config)
+    val coordinator = getTestJobModelManager(config)
 
     // Construct the expected JobModel, so we can compare it to
     // JobCoordinator's JobModel.
@@ -279,6 +280,15 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     }}.toMap
   }
 
+  def getTestJobModelManager(config: MapConfig) = {
+    val coordinatorStreamManager = new CoordinatorStreamManager(config, new MetricsRegistryMap)
+    coordinatorStreamManager.register("TestJobCoordinator")
+    coordinatorStreamManager.start
+    coordinatorStreamManager.bootstrap
+    val changelogPartitionManager = new ChangelogStreamManager(coordinatorStreamManager)
+    JobModelManager(coordinatorStreamManager, changelogPartitionManager.readPartitionMapping())
+  }
+
   @Before
   def setUp() {
     // setup the test stream metadata

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 965209d..2dd9569 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -71,13 +71,10 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
   var taskNames = Set[TaskName]()
   var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = null
 
-
   /**
-    * @inheritdoc
+    * Create checkpoint stream prior to start.
     */
-  override def start {
-    Preconditions.checkNotNull(systemProducer)
-    Preconditions.checkNotNull(systemConsumer)
+  override def createResources = {
     Preconditions.checkNotNull(systemAdmin)
 
     systemAdmin.start()
@@ -86,6 +83,19 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
       s"partition count: ${checkpointSpec.getPartitionCount}")
     systemAdmin.createStream(checkpointSpec)
 
+    if (validateCheckpoint) {
+      info(s"Validating checkpoint stream")
+      systemAdmin.validateStream(checkpointSpec)
+    }
+  }
+
+  /**
+    * @inheritdoc
+    */
+  override def start {
+    Preconditions.checkNotNull(systemProducer)
+    Preconditions.checkNotNull(systemConsumer)
+
     // register and start a producer for the checkpoint topic
     systemProducer.start
 
@@ -94,11 +104,6 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
     info(s"Starting checkpoint SystemConsumer from oldest offset $oldestOffset")
     systemConsumer.register(checkpointSsp, oldestOffset)
     systemConsumer.start
-
-    if (validateCheckpoint) {
-      info(s"Validating checkpoint stream")
-      systemAdmin.validateStream(checkpointSpec)
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
index 8a3a2e1..49f0c39 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
@@ -75,6 +75,7 @@ public class TestKafkaCheckpointManagerJava {
         true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());
 
     // expect an exception during startup
+    checkpointManager.createResources();
     checkpointManager.start();
   }
 
@@ -93,6 +94,7 @@ public class TestKafkaCheckpointManagerJava {
         true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());
 
     // expect an exception during startup
+    checkpointManager.createResources();
     checkpointManager.start();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 3761ea1..5586a1a 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -74,6 +74,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     val checkpointTopic = "checkpoint-topic-1"
     val kcm1 = createKafkaCheckpointManager(checkpointTopic)
     kcm1.register(taskName)
+    kcm1.createResources
     kcm1.start
     kcm1.stop
     // check that start actually creates the topic with log compaction enabled
@@ -130,6 +131,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     // create topic with the wrong number of partitions
     createTopic(checkpointTopic, 8, new KafkaConfig(config).getCheckpointTopicProperties())
     try {
+      kcm1.createResources
       kcm1.start
       fail("Expected an exception for invalid number of partitions in the checkpoint topic.")
     } catch {

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index 4d52877..5bba683 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -34,6 +34,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.LocalityManager;
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -129,7 +130,8 @@ public class SamzaTaskProxy implements TaskProxy {
    * @return list of {@link Task} constructed from job model in coordinator stream.
    */
   protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
-    LocalityManager localityManager = new LocalityManager(null, consumer);
+    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(consumer);
+    LocalityManager localityManager = new LocalityManager(coordinatorStreamManager);
     Map<String, Map<String, String>> containerIdToHostMapping = localityManager.readContainerLocality();
     Map<String, String> taskNameToContainerIdMapping = localityManager.getTaskAssignmentManager().readTaskAssignment();
     StorageConfig storageConfig = new StorageConfig(consumer.getConfig());

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index d9261ad..a381a59 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -28,21 +28,24 @@ import kafka.admin.AdminUtils
 import kafka.consumer.{Consumer, ConsumerConfig}
 import kafka.message.MessageAndMetadata
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
 import kafka.zk.EmbeddedZookeeper
 import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.samza.config.{ApplicationConfig, Config, KafkaProducerConfig, MapConfig}
+import org.apache.samza.config._
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.local.ThreadJobFactory
+import org.apache.samza.job.model.{ContainerModel, JobModel}
 import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.system.kafka.TopicMetadataCache
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
 import org.apache.samza.task._
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMetadataStore}
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMetadataStore, Util}
 import org.junit.Assert._
 
 import scala.collection.JavaConverters._
@@ -182,7 +185,7 @@ object StreamTaskTestUtil {
     servers.foreach(server => CoreUtils.delete(server.config.logDirs))
 
     if (zkUtils != null)
-     CoreUtils.swallow(zkUtils.close())
+      CoreUtils.swallow(zkUtils.close())
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown())
     Configuration.setConfiguration(null)
@@ -202,7 +205,9 @@ class StreamTaskTestUtil {
    */
   def startJob = {
     // Start task.
-    val job = new JobRunner(new MapConfig(jobConfig.asJava)).run()
+    val jobRunner = new JobRunner(new MapConfig(jobConfig.asJava))
+    val job = jobRunner.run()
+    createStreams
     assertEquals(ApplicationStatus.Running, job.waitForStatus(ApplicationStatus.Running, 60000))
     TestTask.awaitTaskRegistered
     val tasks = TestTask.tasks
@@ -265,6 +270,21 @@ class StreamTaskTestUtil {
     messages.toList
   }
 
+  def createStreams {
+    val mapConfig = new MapConfig(jobConfig.asJava)
+    val containers = new util.HashMap[String, ContainerModel]()
+    val jobModel = new JobModel(mapConfig, containers)
+    jobModel.maxChangeLogStreamPartitions = 1
+
+    val taskConfig = new TaskConfig(jobModel.getConfig)
+    val checkpointManager = taskConfig.getCheckpointManager(new MetricsRegistryMap())
+    checkpointManager match {
+      case Some(checkpointManager) => checkpointManager.createResources
+      case _ => assert(checkpointManager != null, "No checkpoint manager factory configured")
+    }
+
+    ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions)
+  }
 }
 
 object TestTask {

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
index c10e7fb..e4d47d1 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
@@ -120,7 +120,7 @@ class ShutdownStateStoreTask extends TestTask {
       .asInstanceOf[KeyValueStore[String, String]]
     val iter = store.all
     iter.asScala.foreach( p => restored += (p.getKey -> p.getValue))
-    System.err.println("ShutdownStateStoreTask.init(): %s" format restored)
+    System.err.println("ShutdownStateStoreTask.createStream(): %s" format restored)
     iter.close
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 0b76223..85251f1 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -153,7 +153,7 @@ class StateStoreTestTask extends TestTask {
       .asScala
       .map(_.getValue)
       .toSet
-    System.err.println("StateStoreTestTask.init(): %s" format restored)
+    System.err.println("StateStoreTestTask.createStream(): %s" format restored)
     iter.close
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index c1b1302..6d02272 100644
--- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -36,10 +36,13 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.yarn.ClientHelper;
 import org.apache.samza.metrics.JmxMetricsAccessor;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsValidator;
+import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.apache.samza.util.CommandLine;
@@ -149,7 +152,12 @@ public class YarnJobValidationTool {
   }
 
   public void validateJmxMetrics() throws Exception {
-    JobModelManager jobModelManager = JobModelManager.apply(config);
+    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(config, new MetricsRegistryMap());
+    coordinatorStreamManager.register(getClass().getSimpleName());
+    coordinatorStreamManager.start();
+    coordinatorStreamManager.bootstrap();
+    ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
+    JobModelManager jobModelManager = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping());
     validator.init(config);
     Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
     for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
@@ -163,6 +171,7 @@ public class YarnJobValidationTool {
       log.info("validate container " + containerId + " successfully");
     }
     validator.complete();
+    coordinatorStreamManager.stop();
   }
 
   public static void main(String [] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
index 73c7f49..da23b91 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -27,8 +27,9 @@ import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, MockCoordinatorStreamSystemFactory}
 import org.apache.samza.metrics._
+import org.apache.samza.storage.ChangelogStreamManager
 import org.junit.Assert._
 import org.junit.Test
 
@@ -39,7 +40,7 @@ class TestSamzaYarnAppMasterService {
   @Test
   def testAppMasterDashboardShouldStart {
     val config = getDummyConfig
-    val jobModelManager = JobModelManager(config)
+    val jobModelManager = getTestJobModelManager(config)
     val samzaState = new SamzaApplicationState(jobModelManager)
     val registry = new MetricsRegistryMap()
 
@@ -74,7 +75,7 @@ class TestSamzaYarnAppMasterService {
   def testAppMasterDashboardWebServiceShouldStart {
     // Create some dummy config
     val config = getDummyConfig
-    val jobModelManager = JobModelManager(config)
+    val jobModelManager = getTestJobModelManager(config)
     val samzaState = new SamzaApplicationState(jobModelManager)
     val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1);
     val registry = new MetricsRegistryMap()
@@ -99,6 +100,15 @@ class TestSamzaYarnAppMasterService {
     reader.close
   }
 
+  private def getTestJobModelManager(config: Config) = {
+    val coordinatorStreamManager = new CoordinatorStreamManager(config, new MetricsRegistryMap)
+    coordinatorStreamManager.register("TestJobCoordinator")
+    coordinatorStreamManager.start
+    coordinatorStreamManager.bootstrap
+    val changelogPartitionManager = new ChangelogStreamManager(coordinatorStreamManager)
+    JobModelManager(coordinatorStreamManager, changelogPartitionManager.readPartitionMapping())
+  }
+
   private def getDummyConfig: Config = new MapConfig(Map[String, String](
     "job.name" -> "test",
     "yarn.container.count" -> "1",


[2/2] samza git commit: SAMZA-1555: Move creation of checkpoint and changelog streams to the Job Coordinators

Posted by ni...@apache.org.
SAMZA-1555: Move creation of checkpoint and changelog streams to the Job Coordinators

**Overview**
The purpose of this PR is to consolidate the creation of the changelog and checkpoint streams into the JobCoordinators. In the current state, the changelog stream is created from the JobModelManager and the checkpoint stream is created within the OffsetManager. The issue with creating the checkpoint in the OffsetManager is that the first call happens from the first SamzaContainer that runs and each subsequent SamzaContainer run will attempt to create the checkpoint stream.

**Motivations**
There are three driving forces for this refactoring. The first motivation is to assign the creation of the changelog and checkpoint streams to the JobCoordinators where it is most appropriate. This was discussed in more detail with nickpan47  . The second motivation is to have any potential failure to stream creation happen no later than during job coordination. The third motivation is to accommodate future security work to provide a robust way to set ACLs on streams.

Follow on to this PR will be: https://issues.apache.org/jira/browse/SAMZA-1564

Author: Daniel Nishimura <dn...@gmail.com>

Reviewers: Yi Pan <ni...@gmail.com>, Prateek Maheshwari <pm...@linkedin.com>

Closes #413 from dnishimura/samza-1555-move-changelog-checkpoint-creation and squashes the following commits:

1102314 [Daniel Nishimura] Fix a comment change that Intellij inadvertently refactored.
2b2eb16 [Daniel Nishimura] Trigger CI again
366b7b7 [Daniel Nishimura] Trigger CI
dce36b6 [Daniel Nishimura] Add isBootstrapped flag back to CoordinatorStreamSystemConsumer.
2efcfd4 [Daniel Nishimura] Trigger CI
6b2d912 [Daniel Nishimura] Changes related to Yi's latest review.
9d02e7a [Daniel Nishimura] Change createChangeLogStream to a static method.
effec24 [Daniel Nishimura] Cleanup from latest code review.
1bdfda7 [Daniel Nishimura] CoordinatorStreamManager lifecycle refactoring.
5178ca5 [Daniel Nishimura] Refactor AbstractCoordinatorStreamManager as a concrete class.
894af9c [Daniel Nishimura] Merge from master branch.
4009a0b [Daniel Nishimura] Changes from code review.
3a12a75 [Daniel Nishimura] Separation of changelog manager and jobmodel manager. Create CoordinatorStream class to encapsulate creation and management of coordinator stream consumer and producer.
c188adb [Daniel Nishimura] Merge from master branch.
971fa91 [Daniel Nishimura] Move the responsibility of changelog and checkpoint stream creation to the job coordinators.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6dc89e10
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6dc89e10
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6dc89e10

Branch: refs/heads/master
Commit: 6dc89e100998a1b1d20681e8fb63803398c904c4
Parents: 718c666
Author: Daniel Nishimura <dn...@gmail.com>
Authored: Fri Feb 16 14:01:06 2018 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Feb 16 14:01:06 2018 -0800

----------------------------------------------------------------------
 .../samza/checkpoint/CheckpointManager.java     |  12 +-
 .../ClusterBasedJobCoordinator.java             |  48 +++++-
 .../apache/samza/config/JavaStorageConfig.java  |  18 +-
 .../apache/samza/config/JavaSystemConfig.java   |  10 ++
 .../org/apache/samza/config/TaskConfigJava.java |  35 +++-
 .../apache/samza/container/LocalityManager.java |  94 ++++------
 .../grouper/task/TaskAssignmentManager.java     |  37 ++--
 .../AbstractCoordinatorStreamManager.java       | 132 --------------
 .../stream/CoordinatorStreamManager.java        | 170 +++++++++++++++++++
 .../stream/CoordinatorStreamSystemConsumer.java |   5 +
 .../stream/CoordinatorStreamSystemProducer.java |   6 +
 .../org/apache/samza/job/model/JobModel.java    |  13 +-
 .../standalone/PassthroughJobCoordinator.java   |   6 +
 .../storage/ChangelogPartitionManager.java      |  89 ----------
 .../samza/storage/ChangelogStreamManager.java   | 154 +++++++++++++++++
 .../apache/samza/storage/StorageRecovery.java   |   9 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  22 ++-
 .../samza/checkpoint/CheckpointTool.scala       |  16 +-
 .../org/apache/samza/config/StorageConfig.scala |   7 +-
 .../org/apache/samza/config/TaskConfig.scala    |  13 +-
 .../apache/samza/container/SamzaContainer.scala |  34 ++--
 .../samza/coordinator/JobModelManager.scala     | 125 ++------------
 .../samza/job/local/ProcessJobFactory.scala     |  31 +++-
 .../samza/job/local/ThreadJobFactory.scala      |  28 ++-
 .../samza/container/TestLocalityManager.java    |  26 ++-
 .../grouper/task/TestTaskAssignmentManager.java |  25 ++-
 .../samza/coordinator/TestJobCoordinator.scala  |  22 ++-
 .../kafka/KafkaCheckpointManager.scala          |  25 +--
 .../kafka/TestKafkaCheckpointManagerJava.java   |   2 +
 .../kafka/TestKafkaCheckpointManager.scala      |   2 +
 .../samza/rest/proxy/task/SamzaTaskProxy.java   |   4 +-
 .../test/integration/StreamTaskTestUtil.scala   |  30 +++-
 .../integration/TestShutdownStatefulTask.scala  |   2 +-
 .../test/integration/TestStatefulTask.scala     |   2 +-
 .../samza/validation/YarnJobValidationTool.java |  11 +-
 .../yarn/TestSamzaYarnAppMasterService.scala    |  16 +-
 36 files changed, 762 insertions(+), 519 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index bc75351..4dcefaa 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -26,6 +26,14 @@ import org.apache.samza.container.TaskName;
  * implementation-specific location.
  */
 public interface CheckpointManager {
+  /**
+   * Creates checkpoint stream.
+   */
+  default void createResources() { }
+
+  /**
+   * Perform startup operations.
+   */
   void start();
 
   /**
@@ -57,5 +65,5 @@ public interface CheckpointManager {
   /**
    * Clear the checkpoints in the checkpoint stream.
    */
-  default void clearCheckpoints() { };
-}
\ No newline at end of file
+  default void clearCheckpoints() { }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 91b94f4..60cc65d 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -19,9 +19,11 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
 import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.PartitionChangeException;
+import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -31,9 +33,12 @@ import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
@@ -73,7 +78,6 @@ public class ClusterBasedJobCoordinator {
   private static final Logger log = LoggerFactory.getLogger(ClusterBasedJobCoordinator.class);
 
   private final Config config;
-
   private final ClusterManagerConfig clusterManagerConfig;
 
   /**
@@ -94,6 +98,16 @@ public class ClusterBasedJobCoordinator {
    */
   private final JobModelManager jobModelManager;
 
+  /**
+   * A ChangelogStreamManager to handle creation of changelog stream and map changelog stream partitions
+   */
+  private final ChangelogStreamManager changelogStreamManager;
+
+  /**
+   * Single instance of the coordinator stream to use.
+   */
+  private final CoordinatorStreamManager coordinatorStreamManager;
+
   /*
    * The interval for polling the Task Manager for shutdown.
    */
@@ -147,8 +161,18 @@ public class ClusterBasedJobCoordinator {
 
     metrics = new MetricsRegistryMap();
 
-    //build a JobModelReader and perform partition assignments.
-    jobModelManager = buildJobModelManager(coordinatorSystemConfig, metrics);
+    coordinatorStreamManager = new CoordinatorStreamManager(coordinatorSystemConfig, metrics);
+    // register ClusterBasedJobCoordinator with the CoordinatorStreamManager.
+    coordinatorStreamManager.register(getClass().getSimpleName());
+    // start the coordinator stream's underlying consumer and producer.
+    coordinatorStreamManager.start();
+    // bootstrap current configuration.
+    coordinatorStreamManager.bootstrap();
+
+    // build a JobModelManager and ChangelogStreamManager and perform partition assignments.
+    changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
+    jobModelManager = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping());
+
     config = jobModelManager.jobModel().getConfig();
     hasDurableStores = new StorageConfig(config).hasDurableStores();
     state = new SamzaApplicationState(jobModelManager);
@@ -186,6 +210,18 @@ public class ClusterBasedJobCoordinator {
       //initialize JobCoordinator state
       log.info("Starting Cluster Based Job Coordinator");
 
+      //create necessary checkpoint and changelog streams, if not created
+      JobModel jobModel = jobModelManager.jobModel();
+      CheckpointManager checkpointManager = new TaskConfigJava(config).getCheckpointManager(metrics);
+      if (checkpointManager != null) {
+        checkpointManager.createResources();
+      }
+      ChangelogStreamManager.createChangelogStreams(jobModel.getConfig(), jobModel.maxChangeLogStreamPartitions);
+
+      // Remap changelog partitions to tasks
+      Map prevPartitionMappings = changelogStreamManager.readPartitionMapping();
+      changelogStreamManager.updatePartitionMapping(prevPartitionMappings, jobModel.getTaskPartitionMappings());
+
       containerProcessManager.start();
       systemAdmins.start();
       partitionMonitor.start();
@@ -225,6 +261,7 @@ public class ClusterBasedJobCoordinator {
       partitionMonitor.stop();
       systemAdmins.stop();
       containerProcessManager.stop();
+      coordinatorStreamManager.stop();
     } catch (Throwable e) {
       log.error("Exception while stopping task manager {}", e);
     }
@@ -240,11 +277,6 @@ public class ClusterBasedJobCoordinator {
     }
   }
 
-  private JobModelManager buildJobModelManager(Config coordinatorSystemConfig, MetricsRegistryMap registry)  {
-    JobModelManager jobModelManager = JobModelManager.apply(coordinatorSystemConfig, registry);
-    return jobModelManager;
-  }
-
   private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
     StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
     Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index 48beec9..bbf0ccf 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -38,6 +38,10 @@ public class JavaStorageConfig extends MapConfig {
   private static final String MSG_SERDE = "stores.%s.msg.serde";
   private static final String CHANGELOG_STREAM = "stores.%s.changelog";
   private static final String CHANGELOG_SYSTEM = "job.changelog.system";
+  private static final String ACCESSLOG_STREAM_SUFFIX = "access-log";
+  private static final String ACCESSLOG_SAMPLING_RATIO = "stores.%s.accesslog.sampling.ratio";
+  private static final String ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled";
+  private static final int DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50;
 
   public JavaStorageConfig(Config config) {
     super(config);
@@ -80,6 +84,18 @@ public class JavaStorageConfig extends MapConfig {
     return systemStreamRes;
   }
 
+  public boolean getAccessLogEnabled(String storeName) {
+    return getBoolean(String.format(ACCESSLOG_ENABLED, storeName), false);
+  }
+
+  public String getAccessLogStream(String changeLogStream) {
+    return String.format("%s-%s", changeLogStream, ACCESSLOG_STREAM_SUFFIX);
+  }
+
+  public int getAccessLogSamplingRatio(String storeName) {
+    return getInt(String.format(ACCESSLOG_SAMPLING_RATIO, storeName), DEFAULT_ACCESSLOG_SAMPLING_RATIO);
+  }
+
   public String getStorageFactoryClassName(String storeName) {
     return get(String.format(FACTORY, storeName), null);
   }
@@ -105,7 +121,7 @@ public class JavaStorageConfig extends MapConfig {
    *
    * If the former syntax is used, that system name will still be honored. For the latter syntax, this method is used.
    *
-   * @return the name of the system to use by default for all changelogs, if defined. 
+   * @return the name of the system to use by default for all changelogs, if defined.
    */
   public String getChangelogSystem() {
     return get(CHANGELOG_SYSTEM,  get(JobConfig.JOB_DEFAULT_SYSTEM(), null));

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
index 57707aa..1e51c46 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
@@ -84,6 +84,16 @@ public class JavaSystemConfig extends MapConfig {
   }
 
   /**
+   * Get {@link SystemAdmin} instance for given system name.
+   *
+   * @param systemName System name
+   * @return SystemAdmin of the system if it exists, otherwise null.
+   */
+  public SystemAdmin getSystemAdmin(String systemName) {
+    return getSystemAdmins().get(systemName);
+  }
+
+  /**
    * Get {@link SystemFactory} instances for all the systems defined in this config.
    *
    * @return a map from system name to {@link SystemFactory}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 04e15dc..91cb9ef 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -25,7 +25,11 @@ import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Util;
@@ -43,14 +47,43 @@ public class TaskConfigJava extends MapConfig {
   public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
   private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
   private static final String BROADCAST_STREAM_RANGE_PATTERN = "^\\[[\\d]+\\-[\\d]+\\]$";
-  public static final Logger LOGGER = LoggerFactory.getLogger(TaskConfigJava.class);
 
+  // class name to use when sending offset checkpoints
+  public static final String CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory";
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(TaskConfigJava.class);
 
   public TaskConfigJava(Config config) {
     super(config);
   }
 
   /**
+   * Get the name of the checkpoint manager factory
+   *
+   * @return Name of checkpoint manager factory
+   */
+  public String getCheckpointManagerFactoryName() {
+    return get(CHECKPOINT_MANAGER_FACTORY, null);
+  }
+
+  /**
+   * Create the checkpoint manager
+   *
+   * @param metricsRegistry Registry of metrics to use. Can be null if not using metrics.
+   * @return CheckpointManager object if checkpoint manager factory is configured, otherwise null.
+   */
+  public CheckpointManager getCheckpointManager(MetricsRegistry metricsRegistry) {
+    // Initialize checkpoint streams during job coordination
+    String checkpointManagerFactoryName = getCheckpointManagerFactoryName();
+    if (StringUtils.isNotBlank(checkpointManagerFactoryName)) {
+      CheckpointManager checkpointManager =
+          Util.<CheckpointManagerFactory>getObj(checkpointManagerFactoryName).getCheckpointManager(this, metricsRegistry);
+      return checkpointManager;
+    }
+    return null;
+  }
+
+  /**
    * Get the systemStreamPartitions of the broadcast stream. Specifying
    * one partition for one stream or a range of the partitions for one
    * stream is allowed.

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 033002c..743c87c 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -20,10 +20,8 @@
 package org.apache.samza.container;
 
 import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
-import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Collections;
@@ -31,60 +29,28 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 
+
 /**
  * Locality Manager is used to persist and read the container-to-host
  * assignment information from the coordinator stream
  * */
-public class LocalityManager extends AbstractCoordinatorStreamManager {
-  private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
-  private Map<String, Map<String, String>> containerToHostMapping = new HashMap<>();
-  private final TaskAssignmentManager taskAssignmentManager;
-  private final boolean writeOnly;
-
-  /**
-   * Default constructor that creates a read-write manager
-   *
-   * @param coordinatorStreamProducer producer to the coordinator stream
-   * @param coordinatorStreamConsumer consumer for the coordinator stream
-   */
-  public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
-                         CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
-    super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaContainer-");
-    this.writeOnly = coordinatorStreamConsumer == null;
-    this.taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamProducer, coordinatorStreamConsumer);
-  }
-
+public class LocalityManager {
+  private static final String CONTAINER_PREFIX = "SamzaContainer-";
+  private static final Logger LOG = LoggerFactory.getLogger(LocalityManager.class);
 
-  /**
-   * Special constructor that creates a write-only {@link LocalityManager} that only writes
-   * to coordinator stream in {@link SamzaContainer}
-   *
-   * @param coordinatorStreamSystemProducer producer to the coordinator stream
-   */
-  public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer) {
-    this(coordinatorStreamSystemProducer, null);
-  }
+  private final CoordinatorStreamManager coordinatorStreamManager;
+  private final TaskAssignmentManager taskAssignmentManager;
 
-  /**
-   * This method is not supported in {@link LocalityManager}. Use {@link LocalityManager#register(String)} instead.
-   *
-   * @throws UnsupportedOperationException in the case if a {@link TaskName} is passed
-   */
-  @Override
-  public void register(TaskName taskName) {
-    throw new UnsupportedOperationException("TaskName cannot be registered with LocalityManager");
-  }
+  private Map<String, Map<String, String>> containerToHostMapping = new HashMap<>();
 
   /**
-   * Registers the locality manager with a source suffix that is container id
+   * Constructor that creates a read-write or write-only locality manager.
    *
-   * @param sourceSuffix the source suffix which is a container id
+   * @param coordinatorStreamManager Coordinator stream manager.
    */
-  public void register(String sourceSuffix) {
-    if (!this.writeOnly) {
-      registerCoordinatorStreamConsumer();
-    }
-    registerCoordinatorStreamProducer(getSource() + sourceSuffix);
+  public LocalityManager(CoordinatorStreamManager coordinatorStreamManager) {
+    this.coordinatorStreamManager = coordinatorStreamManager;
+    this.taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager);
   }
 
   /**
@@ -94,12 +60,13 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
    * @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress)
    */
   public Map<String, Map<String, String>> readContainerLocality() {
-    if (this.writeOnly) {
-      throw new UnsupportedOperationException("Read container locality function is not supported in write-only LocalityManager");
+    if (coordinatorStreamManager == null) {
+      throw new IllegalStateException("No coordinator stream manager to read container locality from.");
     }
 
     Map<String, Map<String, String>> allMappings = new HashMap<>();
-    for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
+    for (CoordinatorStreamMessage message : coordinatorStreamManager.getBootstrappedStream(
+        SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping mapping = new SetContainerHostMapping(message);
       Map<String, String> localityMappings = new HashMap<>();
       localityMappings.put(SetContainerHostMapping.HOST_KEY, mapping.getHostLocality());
@@ -109,9 +76,9 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
     }
     containerToHostMapping = Collections.unmodifiableMap(allMappings);
 
-    if (log.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       for (Map.Entry<String, Map<String, String>> entry : containerToHostMapping.entrySet()) {
-        log.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue()));
+        LOG.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue()));
       }
     }
 
@@ -126,16 +93,23 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
    * @param jmxAddress  the JMX URL address
    * @param jmxTunnelingAddress  the JMX Tunnel URL address
    */
-  public void writeContainerToHostMapping(String containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
+  public void writeContainerToHostMapping(String containerId, String hostName, String jmxAddress,
+      String jmxTunnelingAddress) {
+    if (coordinatorStreamManager == null) {
+      throw new IllegalStateException("No coordinator stream manager to write locality info to.");
+    }
+
     Map<String, String> existingMappings = containerToHostMapping.get(containerId);
-    String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
+    String existingHostMapping =
+        existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
     if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {
-      log.info("Container {} moved from {} to {}", new Object[]{containerId, existingHostMapping, hostName});
+      LOG.info("Container {} moved from {} to {}", new Object[]{containerId, existingHostMapping, hostName});
     } else {
-      log.info("Container {} started at {}", containerId, hostName);
+      LOG.info("Container {} started at {}", containerId, hostName);
     }
-    send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostName, jmxAddress,
-        jmxTunnelingAddress));
+    coordinatorStreamManager.send(
+        new SetContainerHostMapping(CONTAINER_PREFIX + containerId, String.valueOf(containerId), hostName, jmxAddress,
+            jmxTunnelingAddress));
     Map<String, String> mappings = new HashMap<>();
     mappings.put(SetContainerHostMapping.HOST_KEY, hostName);
     mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress);
@@ -146,4 +120,8 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
   public TaskAssignmentManager getTaskAssignmentManager() {
     return taskAssignmentManager;
   }
+
+  public CoordinatorStreamManager getCoordinatorStreamManager() {
+    return coordinatorStreamManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index d33a22b..6ec070a 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -21,10 +21,7 @@ package org.apache.samza.container.grouper.task;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.Delete;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
@@ -36,32 +33,20 @@ import org.slf4j.LoggerFactory;
  * Task assignment Manager is used to persist and read the task-to-container
  * assignment information from the coordinator stream
  * */
-public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
+public class TaskAssignmentManager {
   private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
   private final Map<String, String> taskNameToContainerId = new HashMap<>();
-  private boolean registered = false;
+  private final CoordinatorStreamManager coordinatorStreamManager;
+  private static final String SOURCE = "SamzaTaskAssignmentManager";
 
   /**
    * Default constructor that creates a read-write manager
    *
-   * @param coordinatorStreamProducer producer to the coordinator stream
-   * @param coordinatorStreamConsumer consumer for the coordinator stream
+   * @param coordinatorStreamManager coordinator stream manager.
    */
-  public TaskAssignmentManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
-                         CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
-    super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaTaskAssignmentManager");
-    register(null);
-  }
-
-  @Override
-  public void register(TaskName taskName) {
-    if (!registered) {
-      // taskName will not be used. This producer is global scope.
-      registerCoordinatorStreamProducer(getSource());
-      // We don't register the consumer because we don't manage the consumer's
-      // lifecycle. Also, we don't need to set any properties on the consumer.
-      registered = true;
-    }
+  public TaskAssignmentManager(CoordinatorStreamManager coordinatorStreamManager) {
+    this.coordinatorStreamManager = coordinatorStreamManager;
+    coordinatorStreamManager.register(SOURCE);
   }
 
   /**
@@ -72,7 +57,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
    */
   public Map<String, String> readTaskAssignment() {
     taskNameToContainerId.clear();
-    for (CoordinatorStreamMessage message: getBootstrappedStream(SetTaskContainerMapping.TYPE)) {
+    for (CoordinatorStreamMessage message: coordinatorStreamManager.getBootstrappedStream(SetTaskContainerMapping.TYPE)) {
       if (message.isDelete()) {
         taskNameToContainerId.remove(message.getKey());
         log.debug("Got TaskContainerMapping delete message: {}", message);
@@ -105,10 +90,10 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
     }
 
     if (containerId == null) {
-      send(new Delete(getSource(), taskName, SetTaskContainerMapping.TYPE));
+      coordinatorStreamManager.send(new Delete(SOURCE, taskName, SetTaskContainerMapping.TYPE));
       taskNameToContainerId.remove(taskName);
     } else {
-      send(new SetTaskContainerMapping(getSource(), taskName, String.valueOf(containerId)));
+      coordinatorStreamManager.send(new SetTaskContainerMapping(SOURCE, taskName, String.valueOf(containerId)));
       taskNameToContainerId.put(taskName, containerId);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
deleted file mode 100644
index 9b0d849..0000000
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream;
-
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
-
-import java.util.Set;
-
-/**
- * Abstract class which handles the common functionality for coordinator stream consumer and producer
- */
-public abstract class AbstractCoordinatorStreamManager {
-  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
-  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
-  private final String source;
-
-  /**
-   * Creates a new {@link AbstractCoordinatorStreamManager} with a given coordinator stream producer, consumer and with a given source.
-   * @param coordinatorStreamProducer the {@link CoordinatorStreamSystemProducer} which should be used with the {@link AbstractCoordinatorStreamManager}
-   * @param coordinatorStreamConsumer the {@link CoordinatorStreamSystemConsumer} which should be used with the {@link AbstractCoordinatorStreamManager}
-   * @param source ths source for the coordinator stream producer
-   */
-  protected AbstractCoordinatorStreamManager(CoordinatorStreamSystemProducer coordinatorStreamProducer, CoordinatorStreamSystemConsumer coordinatorStreamConsumer, String source) {
-    this.coordinatorStreamProducer = coordinatorStreamProducer;
-    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
-    this.source = source;
-  }
-
-  /**
-   * Starts the underlying coordinator stream producer and consumer.
-   */
-  public void start() {
-    if (coordinatorStreamProducer != null) {
-      coordinatorStreamProducer.start();
-    }
-    if (coordinatorStreamConsumer != null) {
-      coordinatorStreamConsumer.start();
-    }
-  }
-
-  /**
-   * Stops the underlying coordinator stream producer and consumer.
-   */
-  public void stop() {
-    if (coordinatorStreamConsumer != null) {
-      coordinatorStreamConsumer.stop();
-    }
-    if (coordinatorStreamProducer != null) {
-      coordinatorStreamProducer.stop();
-    }
-  }
-
-  /**
-   * Sends a {@link CoordinatorStreamMessage} using the underlying system producer.
-   * @param message message which should be sent to producer
-   */
-  public void send(CoordinatorStreamMessage message) {
-    if (coordinatorStreamProducer == null) {
-      throw new UnsupportedOperationException(String.format("CoordinatorStreamProducer is not initialized in the AbstractCoordinatorStreamManager. "
-          + "manager registered source: %s, input source: %s", this.source, source));
-    }
-    coordinatorStreamProducer.send(message);
-  }
-
-  /**
-   * Returns a set of messages from the bootstrapped stream for a given source.
-   * @param source the source of the given messages
-   * @return a set of {@link CoordinatorStreamMessage} if messages exists for the given source, else an empty set
-   */
-  public Set<CoordinatorStreamMessage> getBootstrappedStream(String source) {
-    if (coordinatorStreamConsumer == null) {
-      throw new UnsupportedOperationException(String.format("CoordinatorStreamConsumer is not initialized in the AbstractCoordinatorStreamManager. "
-          + "manager registered source: %s, input source: %s", this.source, source));
-    }
-    return coordinatorStreamConsumer.getBootstrappedStream(source);
-  }
-
-  /**
-   * Register the coordinator stream consumer.
-   */
-  protected void registerCoordinatorStreamConsumer() {
-    if (coordinatorStreamConsumer != null) {
-      coordinatorStreamConsumer.register();
-    }
-  }
-
-  /**
-   * Registers the coordinator stream producer for a given source.
-   * @param source the source to register
-   */
-  protected void registerCoordinatorStreamProducer(String source) {
-    if (coordinatorStreamProducer != null) {
-      coordinatorStreamProducer.register(source);
-    }
-  }
-
-  /**
-   * Returns the source name which is managed by {@link AbstractCoordinatorStreamManager}.
-   * @return the source name
-   */
-  protected String getSource() {
-    return source;
-  }
-
-  /**
-   * Registers a consumer and a producer. Every subclass should implement it's logic for registration.<br><br>
-   * Registering a single consumer and a single producer can be done with {@link AbstractCoordinatorStreamManager#registerCoordinatorStreamConsumer()}
-   * and {@link AbstractCoordinatorStreamManager#registerCoordinatorStreamProducer(String)} methods respectively.<br>
-   * These methods can be used in the concrete implementation of this register method.
-   *
-   * @param taskName name which should be used with the producer
-   */
-  public abstract void register(TaskName taskName);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
new file mode 100644
index 0000000..f6e68b5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class which handles the common functionality for coordinator stream consumer and producer
+ */
+public class CoordinatorStreamManager {
+  private static final Logger LOG = LoggerFactory.getLogger(CoordinatorStreamManager.class);
+
+  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+
+  /**
+   * Creates a new read-only {@link CoordinatorStreamManager} with a given coordinator stream consumer.
+   *
+   * @param coordinatorStreamConsumer The {@link CoordinatorStreamSystemConsumer} which should be used with the {@link CoordinatorStreamManager}
+   */
+  public CoordinatorStreamManager(CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+    this(null, coordinatorStreamConsumer);
+  }
+
+  /**
+   * Creates a new write-only {@link CoordinatorStreamManager} with a given coordinator stream producer.
+   *
+   * @param coordinatorStreamProducer The {@link CoordinatorStreamSystemProducer} which should be used with the {@link CoordinatorStreamManager}
+   */
+  public CoordinatorStreamManager(CoordinatorStreamSystemProducer coordinatorStreamProducer) {
+    this(coordinatorStreamProducer, null);
+  }
+
+  /**
+   * Creates a new {@link CoordinatorStreamManager} with a given coordinator stream producer and consumer.
+   *
+   * @param coordinatorStreamProducer The {@link CoordinatorStreamSystemProducer} which should be used with the {@link CoordinatorStreamManager}
+   * @param coordinatorStreamConsumer The {@link CoordinatorStreamSystemConsumer} which should be used with the {@link CoordinatorStreamManager}
+   */
+  public CoordinatorStreamManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+      CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+    this.coordinatorStreamProducer = coordinatorStreamProducer;
+    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+  }
+
+  /**
+   * Creates a new {@link CoordinatorStreamManager} and instantiates the underlying coordinator stream producer and consumer.
+   *
+   * @param coordinatorSystemConfig Configuration used to instantiate the coordinator stream producer and consumer.
+   * @param metricsRegistry Metrics registry
+   */
+  public CoordinatorStreamManager(Config coordinatorSystemConfig, MetricsRegistry metricsRegistry) {
+    coordinatorStreamConsumer = new CoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistry);
+    coordinatorStreamProducer = new CoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistry);
+  }
+
+  /**
+   * Register source with the coordinator stream.
+   *
+   * @param source
+   */
+  public void register(String source) {
+    if (coordinatorStreamConsumer != null) {
+      LOG.info("Registering coordinator system stream consumer from {}.", source);
+      coordinatorStreamConsumer.register();
+    }
+    if (coordinatorStreamProducer != null) {
+      LOG.info("Registering coordinator system stream producer from {}.", source);
+      coordinatorStreamProducer.register(source);
+    }
+  }
+
+  /**
+   * Starts the underlying coordinator stream producer and consumer.
+   */
+  public void start() {
+    if (coordinatorStreamConsumer != null) {
+      LOG.debug("Starting coordinator system stream consumer.");
+      coordinatorStreamConsumer.start();
+    }
+    if (coordinatorStreamProducer != null) {
+      LOG.debug("Starting coordinator system stream producer.");
+      coordinatorStreamProducer.start();
+    }
+  }
+
+  /**
+   * Stops the underlying coordinator stream producer and consumer.
+   */
+  public void stop() {
+    if (coordinatorStreamConsumer != null) {
+      coordinatorStreamConsumer.stop();
+    }
+    if (coordinatorStreamProducer != null) {
+      coordinatorStreamProducer.stop();
+    }
+  }
+
+  /**
+   * Sends a {@link CoordinatorStreamMessage} using the underlying system producer.
+   *
+   * @param message message which should be sent to producer
+   */
+  public void send(CoordinatorStreamMessage message) {
+    if (coordinatorStreamProducer == null) {
+      throw new UnsupportedOperationException(
+          String.format("CoordinatorStreamProducer is not initialized in the CoordinatorStreamManager. "));
+    }
+    coordinatorStreamProducer.send(message);
+  }
+
+  /**
+   * Bootstrap the coordinator stream consumer.
+   */
+  public void bootstrap() {
+    if (coordinatorStreamConsumer != null) {
+      LOG.debug("Bootstrapping coordinator system stream consumer.");
+      coordinatorStreamConsumer.bootstrap();
+    }
+  }
+
+  /**
+   * Returns a set of messages from the bootstrapped stream for a given source.
+   *
+   * @param source the source of the given messages
+   * @return a set of {@link CoordinatorStreamMessage} if messages exists for the given source, else an empty set
+   */
+  public Set<CoordinatorStreamMessage> getBootstrappedStream(String source) {
+    if (coordinatorStreamConsumer == null) {
+      throw new UnsupportedOperationException(
+          String.format("CoordinatorStreamConsumer is not initialized in the CoordinatorStreamManager. "));
+    }
+    return coordinatorStreamConsumer.getBootstrappedStream(source);
+  }
+
+  /**
+   * Returns the config from the coordinator stream consumer.
+   *
+   * @return Config of the coordinator stream consumer.
+   */
+  public Config getConfig() {
+    if (coordinatorStreamConsumer == null) {
+      throw new IllegalStateException(
+          String.format("CoordinatorStreamConsumer is not initialized in the CoordinatorStreamManager. "));
+    }
+    return coordinatorStreamConsumer.getConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 4bbf452..0cf79ee 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.coordinator.stream;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -290,4 +291,8 @@ public class CoordinatorStreamSystemConsumer {
     return iterator.hasNext();
   }
 
+  @VisibleForTesting
+  boolean isStarted() {
+    return isStarted;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index b984e73..701b229 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.coordinator.stream;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -148,4 +149,9 @@ public class CoordinatorStreamSystemProducer {
     }
     systemProducer.flush(source);
   }
+
+  @VisibleForTesting
+  boolean isStarted() {
+    return isStarted;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index 1115faf..dc2bff8 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -22,8 +22,10 @@ package org.apache.samza.job.model;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 
 /**
@@ -31,7 +33,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
  * The data model used to represent a Samza job. The model is used in the job
  * coordinator and SamzaContainer to determine how to execute Samza jobs.
  * </p>
- * 
+ *
  * <p>
  * The hierarchy for a Samza's job data model is that jobs have containers, and
  * containers have tasks. Each data model contains relevant information, such as
@@ -141,6 +143,15 @@ public class JobModel {
     return containers;
   }
 
+  public Map<TaskName, Integer> getTaskPartitionMappings() {
+    HashMap<TaskName, Integer> mappings = new HashMap<>();
+    for (Map.Entry<String, ContainerModel> container: containers.entrySet()) {
+      mappings.putAll(container.getValue().getTasks().entrySet().stream()
+          .collect(Collectors.toMap(t -> t.getKey(), t -> t.getValue().getChangelogPartition().getPartitionId())));
+    }
+    return mappings;
+  }
+
   @Override
   public String toString() {
     return "JobModel [config=" + config + ", containers=" + containers + "]";

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index e45b778..d58b251 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -18,10 +18,12 @@
  */
 package org.apache.samza.standalone;
 
+import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
@@ -75,6 +77,10 @@ public class PassthroughJobCoordinator implements JobCoordinator {
     JobModel jobModel = null;
     try {
       jobModel = getJobModel();
+      CheckpointManager checkpointManager = new TaskConfigJava(jobModel.getConfig()).getCheckpointManager(null);
+      if (checkpointManager != null) {
+        checkpointManager.createResources();
+      }
     } catch (Exception e) {
       LOGGER.error("Exception while trying to getJobModel.", e);
       if (coordinatorListener != null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
deleted file mode 100644
index 7e274c7..0000000
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
-import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The Changelog manager is used to persist and read the changelog information from the coordinator stream.
- */
-public class ChangelogPartitionManager extends AbstractCoordinatorStreamManager {
-
-  private static final Logger log = LoggerFactory.getLogger(ChangelogPartitionManager.class);
-  private boolean isCoordinatorConsumerRegistered = false;
-
-  public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
-      CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
-      String source) {
-    super(coordinatorStreamProducer, coordinatorStreamConsumer, source);
-  }
-
-  /**
-   * Registers this manager to write changelog mapping for a particular task.
-   * @param taskName The taskname to be registered for changelog mapping.
-   */
-  public void register(TaskName taskName) {
-    log.debug("Adding taskName {} to {}", taskName, this);
-    if (!isCoordinatorConsumerRegistered) {
-      registerCoordinatorStreamConsumer();
-      isCoordinatorConsumerRegistered = true;
-    }
-    registerCoordinatorStreamProducer(taskName.getTaskName());
-  }
-
-  /**
-   * Read the taskName to partition mapping that is being maintained by this ChangelogManager
-   * @return TaskName to change log partition mapping, or an empty map if there were no messages.
-   */
-  public Map<TaskName, Integer> readChangeLogPartitionMapping() {
-    log.debug("Reading changelog partition information");
-    final HashMap<TaskName, Integer> changelogMapping = new HashMap<TaskName, Integer>();
-    for (CoordinatorStreamMessage coordinatorStreamMessage : getBootstrappedStream(SetChangelogMapping.TYPE)) {
-      SetChangelogMapping changelogMapEntry = new SetChangelogMapping(coordinatorStreamMessage);
-      changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), changelogMapEntry.getPartition());
-      log.debug("TaskName: {} is mapped to {}", changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
-    }
-    return changelogMapping;
-  }
-
-  /**
-   * Write the taskName to partition mapping that is being maintained by this ChangelogManager
-   * @param changelogEntries The entries that needs to be written to the coordinator stream, the map takes the taskName
-   *                       and it's corresponding changelog partition.
-   */
-  public void writeChangeLogPartitionMapping(Map<TaskName, Integer> changelogEntries) {
-    log.debug("Updating changelog information with: ");
-    for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
-      log.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), entry.getValue());
-      send(new SetChangelogMapping(getSource(), entry.getKey().getTaskName(), entry.getValue()));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
new file mode 100644
index 0000000..6aeb2ba
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The Changelog manager creates the changelog stream. If a coordinator stream manager is provided,
+ * it can be used to read, write and update the changelog stream partition-to-task mapping.
+ */
+public class ChangelogStreamManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ChangelogStreamManager.class);
+  // This is legacy for changelog. Need to investigate what happens if you use a different source name
+  private static final String SOURCE = "JobModelManager";
+
+  private final CoordinatorStreamManager coordinatorStreamManager;
+
+  /**
+   * Construct changelog manager with a bootstrapped coordinator stream.
+   *
+   * @param coordinatorStreamManager Coordinator stream manager.
+   */
+  public ChangelogStreamManager(CoordinatorStreamManager coordinatorStreamManager) {
+    this.coordinatorStreamManager = coordinatorStreamManager;
+  }
+
+  /**
+   * Read the taskName to partition mapping that is being maintained by this ChangelogManager
+   * @return TaskName to change LOG partition mapping, or an empty map if there were no messages.
+   */
+  public Map<TaskName, Integer> readPartitionMapping() {
+    LOG.debug("Reading changelog partition information");
+    final HashMap<TaskName, Integer> changelogMapping = new HashMap<>();
+    for (CoordinatorStreamMessage coordinatorStreamMessage : coordinatorStreamManager.getBootstrappedStream(SetChangelogMapping.TYPE)) {
+      SetChangelogMapping changelogMapEntry = new SetChangelogMapping(coordinatorStreamMessage);
+      changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), changelogMapEntry.getPartition());
+      LOG.debug("TaskName: {} is mapped to {}", changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
+    }
+    return changelogMapping;
+  }
+
+  /**
+   * Write the taskName to partition mapping.
+   * @param changelogEntries The entries that needs to be written to the coordinator stream, the map takes the taskName
+   *                         and it's corresponding changelog partition.
+   */
+  public void writePartitionMapping(Map<TaskName, Integer> changelogEntries) {
+    LOG.debug("Updating changelog information with: ");
+    for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
+      LOG.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), entry.getValue());
+      coordinatorStreamManager.send(new SetChangelogMapping(SOURCE, entry.getKey().getTaskName(), entry.getValue()));
+    }
+  }
+
+  /**
+   * Merge previous and new taskName to partition mapping and write it.
+   * @param prevChangelogEntries The previous map of taskName to changelog partition.
+   * @param newChangelogEntries The new map of taskName to changelog partition.
+   */
+  public void updatePartitionMapping(Map<TaskName, Integer> prevChangelogEntries,
+      Map<TaskName, Integer> newChangelogEntries) {
+    Map<TaskName, Integer> combinedEntries = new HashMap<>(newChangelogEntries);
+    combinedEntries.putAll(prevChangelogEntries);
+    writePartitionMapping(combinedEntries);
+  }
+
+  /**
+   * Utility method to create and validate changelog streams. The method is static because it does not require an
+   * instance of the {@link CoordinatorStreamManager}
+   * @param config Config with changelog info
+   * @param maxChangeLogStreamPartitions Maximum changelog stream partitions to create
+   */
+  public static void createChangelogStreams(Config config, int maxChangeLogStreamPartitions) {
+    // Get changelog store config
+    JavaStorageConfig storageConfig = new JavaStorageConfig(config);
+    Map<String, SystemStream> storeNameSystemStreamMapping = storageConfig.getStoreNames()
+        .stream()
+        .filter(name -> StringUtils.isNotBlank(storageConfig.getChangelogStream(name)))
+        .collect(Collectors.toMap(name -> name,
+            name -> Util.getSystemStreamFromNames(storageConfig.getChangelogStream(name))));
+
+    // Get SystemAdmin for changelog store's system and attempt to create the stream
+    JavaSystemConfig systemConfig = new JavaSystemConfig(config);
+    storeNameSystemStreamMapping.forEach((storeName, systemStream) -> {
+        // Load system admin for this system.
+        SystemAdmin systemAdmin = systemConfig.getSystemAdmin(systemStream.getSystem());
+
+        if (systemAdmin == null) {
+          throw new SamzaException(String.format(
+              "Error creating changelog. Changelog on store %s uses system %s, which is missing from the configuration.",
+              storeName, systemStream.getSystem()));
+        }
+
+        StreamSpec changelogSpec =
+            StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(),
+                maxChangeLogStreamPartitions);
+
+        systemAdmin.start();
+
+        if (systemAdmin.createStream(changelogSpec)) {
+          LOG.info(String.format("created changelog stream %s.", systemStream.getStream()));
+        } else {
+          LOG.info(String.format("changelog stream %s already exists.", systemStream.getStream()));
+        }
+        systemAdmin.validateStream(changelogSpec);
+
+        if (storageConfig.getAccessLogEnabled(storeName)) {
+          String accesslogStream = storageConfig.getAccessLogStream(systemStream.getStream());
+          StreamSpec accesslogSpec =
+              new StreamSpec(accesslogStream, accesslogStream, systemStream.getSystem(), maxChangeLogStreamPartitions);
+          systemAdmin.createStream(accesslogSpec);
+          systemAdmin.validateStream(accesslogSpec);
+        }
+
+        systemAdmin.stop();
+      });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index c55f21f..3af654e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -33,6 +33,7 @@ import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
@@ -119,8 +120,14 @@ public class StorageRecovery extends CommandLine {
    * map
    */
   private void getContainerModels() {
-    JobModel jobModel = JobModelManager.apply(jobConfig).jobModel();
+    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(jobConfig, new MetricsRegistryMap());
+    coordinatorStreamManager.register(getClass().getSimpleName());
+    coordinatorStreamManager.start();
+    coordinatorStreamManager.bootstrap();
+    ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
+    JobModel jobModel = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()).jobModel();
     containers = jobModel.getContainers();
+    coordinatorStreamManager.stop();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 801033d..00eeeae 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -28,13 +28,14 @@ import java.util.Objects;
 import java.util.Set;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
-import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
@@ -47,6 +48,7 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.util.ClassLoaderHelper;
@@ -56,7 +58,6 @@ import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
@@ -95,7 +96,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
   private int debounceTimeMs;
-  private boolean hasCreatedChangeLogStreams = false;
+  private boolean hasCreatedStreams = false;
   private String cachedJobModelVersion = null;
   private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
 
@@ -200,10 +201,19 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
     // Generate the JobModel
     JobModel jobModel = generateNewJobModel(currentProcessorIds);
-    if (!hasCreatedChangeLogStreams) {
-      JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions, systemAdmins);
-      hasCreatedChangeLogStreams = true;
+
+    // Create checkpoint and changelog streams if they don't exist
+    if (!hasCreatedStreams) {
+      CheckpointManager checkpointManager = new TaskConfigJava(jobModel.getConfig()).getCheckpointManager(metrics.getMetricsRegistry());
+      if (checkpointManager != null) {
+        checkpointManager.createResources();
+      }
+
+      // Pass in null Coordinator consumer and producer because ZK doesn't have coordinator streams.
+      ChangelogStreamManager.createChangelogStreams(jobModel.getConfig(), jobModel.maxChangeLogStreamPartitions);
+      hasCreatedStreams = true;
     }
+
     // Assign the next version of JobModel
     String currentJMVersion = zkUtils.getJobModelVersion();
     String nextJMVersion = zkUtils.getNextJobModelVersion(currentJMVersion);

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index e1bb3ea..e517946 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -21,18 +21,22 @@ package org.apache.samza.checkpoint
 
 import java.net.URI
 import java.util.regex.Pattern
+
 import joptsimple.OptionSet
 import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{JobConfig, ConfigRewriter, Config}
+import org.apache.samza.config.{Config, ConfigRewriter, JobConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.JobRunner._
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{Util, CommandLine, Logging}
+import org.apache.samza.util.{CommandLine, Logging, Util}
 import org.apache.samza.{Partition, SamzaException}
+
 import scala.collection.JavaConverters._
 import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.coordinator.stream.CoordinatorStreamManager
+import org.apache.samza.storage.ChangelogStreamManager
 
 import scala.collection.mutable.ListBuffer
 
@@ -160,7 +164,12 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage
     info("Using %s" format manager)
 
     // Find all the TaskNames that would be generated for this job config
-    val jobModelManager = JobModelManager(config)
+    val coordinatorStreamManager = new CoordinatorStreamManager(config, new MetricsRegistryMap())
+    coordinatorStreamManager.register(getClass.getSimpleName)
+    coordinatorStreamManager.start
+    coordinatorStreamManager.bootstrap
+    val changelogManager = new ChangelogStreamManager(coordinatorStreamManager)
+    val jobModelManager = JobModelManager(coordinatorStreamManager, changelogManager.readPartitionMapping())
     val taskNames = jobModelManager
       .jobModel
       .getContainers
@@ -185,6 +194,7 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage
     }
 
     manager.stop
+    coordinatorStreamManager.stop();
   }
 
   /** Load the most recent checkpoint state for all a specified TaskName. */

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index ad03e59..e4ee767 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -21,7 +21,6 @@ package org.apache.samza.config
 
 
 import java.util.concurrent.TimeUnit
-import org.apache.samza.SamzaException
 import scala.collection.JavaConverters._
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
@@ -51,7 +50,7 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
   def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
 
   def getAccessLogEnabled(storeName: String) = {
-    getBoolean(ACCESSLOG_ENABLED format storeName, false)
+    new JavaStorageConfig(config).getAccessLogEnabled(storeName)
   }
 
   def getChangelogStream(name: String) = {
@@ -61,11 +60,11 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
 
   //Returns the accesslog stream name given a changelog stream name
   def getAccessLogStream(changeLogStream: String) = {
-    changeLogStream + "-" + ACCESSLOG_STREAM_SUFFIX
+    new JavaStorageConfig(config).getAccessLogStream(changeLogStream)
   }
 
   def getAccessLogSamplingRatio(storeName: String) = {
-    getInt(ACCESSLOG_SAMPLING_RATIO format storeName, DEFAULT_ACCESSLOG_SAMPLING_RATIO)
+    new JavaStorageConfig(config).getAccessLogSamplingRatio(storeName)
   }
 
   def getChangeLogDeleteRetentionInMs(storeName: String) = {

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 8b9a72b..fe03a52 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -19,7 +19,8 @@
 
 package org.apache.samza.config
 
-import org.apache.samza.container.RunLoopFactory
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStream
 import org.apache.samza.util.{Logging, Util}
 
@@ -33,7 +34,7 @@ object TaskConfig {
   val COMMAND_BUILDER = "task.command.class" // streaming.task-factory-class
   val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
   val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
-  val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints
+  val CHECKPOINT_MANAGER_FACTORY = TaskConfigJava.CHECKPOINT_MANAGER_FACTORY // class name to use when sending offset checkpoints
   val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
   val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
   val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
@@ -70,6 +71,8 @@ object TaskConfig {
 }
 
 class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
+  val javaTaskConfig = new TaskConfigJava(config)
+
   def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match {
     case Some(streams) => if (streams.length > 0) {
       streams.split(",").map(systemStreamNames => {
@@ -106,7 +109,11 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getCommandClass(defaultValue: String) = getOrElse(TaskConfig.COMMAND_BUILDER, defaultValue)
 
-  def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY)
+  def getCheckpointManagerFactory() = Option(javaTaskConfig.getCheckpointManagerFactoryName)
+
+  def getCheckpointManager(metricsRegistry: MetricsRegistry): Option[CheckpointManager] = {
+    Option(javaTaskConfig.getCheckpointManager(metricsRegistry))
+  }
 
   def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc89e10/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index fda654d..9b18044 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -38,7 +38,7 @@ import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
 import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
+import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, CoordinatorStreamSystemProducer}
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.serializers._
@@ -58,12 +58,6 @@ object SamzaContainer extends Logging {
   val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
   val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms"
 
-  def getLocalityManager(containerName: String, config: Config): LocalityManager = {
-    val registryMap = new MetricsRegistryMap(containerName)
-    val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new SamzaContainerMetrics(containerName, registryMap).registry)
-    new LocalityManager(coordinatorSystemProducer)
-  }
-
   /**
    * Fetches config, task:SSP assignments, and task:changelog partition
    * assignments, and returns objects to be used for SamzaContainer's
@@ -90,9 +84,13 @@ object SamzaContainer extends Logging {
     val containerName = "samza-container-%s" format containerId
     val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
 
+    var coordinatorStreamManager: CoordinatorStreamManager = null
     var localityManager: LocalityManager = null
     if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
-      localityManager = getLocalityManager(containerName, config)
+      val registryMap = new MetricsRegistryMap(containerName)
+      val coordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(config, new SamzaContainerMetrics(containerName, registryMap).registry)
+      coordinatorStreamManager = new CoordinatorStreamManager(coordinatorStreamSystemProducer)
+      localityManager = new LocalityManager(coordinatorStreamManager)
     }
 
     val containerPID = Util.getContainerPID
@@ -625,6 +623,7 @@ object SamzaContainer extends Logging {
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       offsetManager = offsetManager,
+      coordinatorStreamManager = coordinatorStreamManager,
       localityManager = localityManager,
       securityManager = securityManager,
       metrics = samzaContainerMetrics,
@@ -647,6 +646,7 @@ class SamzaContainer(
   diskSpaceMonitor: DiskSpaceMonitor = null,
   hostStatisticsMonitor: SystemStatisticsMonitor = null,
   offsetManager: OffsetManager = new OffsetManager,
+  coordinatorStreamManager: CoordinatorStreamManager = null,
   localityManager: LocalityManager = null,
   securityManager: SecurityManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
@@ -840,9 +840,15 @@ class SamzaContainer(
 
   def startLocalityManager {
     if(localityManager != null) {
-      info("Registering localityManager for the container")
-      localityManager.start
-      localityManager.register(String.valueOf(containerContext.id))
+      if(coordinatorStreamManager == null) {
+        // This should never happen.
+        throw new IllegalStateException("Cannot start LocalityManager without a CoordinatorStreamManager")
+      }
+
+      val containerName = "SamzaContainer-" + String.valueOf(containerContext.id)
+      info("Registering %s with the coordinator stream manager." format containerName)
+      coordinatorStreamManager.start
+      coordinatorStreamManager.register(containerName)
 
       info("Writing container locality and JMX address to Coordinator Stream")
       try {
@@ -1013,9 +1019,9 @@ class SamzaContainer(
   }
 
   def shutdownLocalityManager {
-    if(localityManager != null) {
-      info("Shutting down locality manager.")
-      localityManager.stop
+    if(coordinatorStreamManager != null) {
+      info("Shutting down coordinator stream manager used by locality manager.")
+      coordinatorStreamManager.stop
     }
   }