You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/29 04:57:51 UTC

[4/4] samza git commit: SAMZA-465: Use coordinator stream and eliminate CheckpointManager

SAMZA-465: Use coordinator stream and eliminate CheckpointManager


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/23fb2e1c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/23fb2e1c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/23fb2e1c

Branch: refs/heads/master
Commit: 23fb2e1c097317845439ec0abb938821a6106969
Parents: c37d752
Author: Naveen Somasundaram <na...@gmail.com>
Authored: Tue Apr 28 19:57:06 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-ld1.linkedin.biz>
Committed: Tue Apr 28 19:57:06 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   6 +-
 .../org/apache/samza/checkpoint/Checkpoint.java |  72 ---
 .../samza/checkpoint/CheckpointManager.java     |  70 ---
 .../checkpoint/CheckpointManagerFactory.java    |  30 --
 .../org/apache/samza/container/TaskName.java    |   4 +-
 .../org/apache/samza/system/SystemAdmin.java    |  29 +-
 .../org/apache/samza/task/TaskCoordinator.java  |   2 +-
 ...inglePartitionWithoutOffsetsSystemAdmin.java |   5 +
 .../org/apache/samza/checkpoint/Checkpoint.java |  72 +++
 .../samza/checkpoint/CheckpointManager.java     | 116 +++++
 .../stream/CoordinatorStreamMessage.java        | 476 +++++++++++++++++++
 .../stream/CoordinatorStreamSystemConsumer.java | 197 ++++++++
 .../stream/CoordinatorStreamSystemProducer.java | 134 ++++++
 .../org/apache/samza/job/model/TaskModel.java   |  72 +--
 .../serializers/model/JsonTaskModelMixIn.java   |   8 +-
 .../serializers/model/SamzaObjectMapper.java    |  24 +
 .../storage/ChangelogPartitionManager.java      | 115 +++++
 .../samza/checkpoint/CheckpointTool.scala       |  34 +-
 .../apache/samza/checkpoint/OffsetManager.scala |  58 +--
 .../file/FileSystemCheckpointManager.scala      | 107 -----
 .../org/apache/samza/config/JobConfig.scala     |  63 ++-
 .../samza/config/ShellCommandConfig.scala       |   4 +-
 .../org/apache/samza/config/StreamConfig.scala  |   2 -
 .../org/apache/samza/config/TaskConfig.scala    |   2 +-
 .../apache/samza/container/SamzaContainer.scala |  18 +-
 .../samza/coordinator/JobCoordinator.scala      | 270 +++++++----
 .../samza/coordinator/server/HttpServer.scala   |   3 +-
 .../samza/coordinator/server/JobServlet.scala   |   7 +-
 .../stream/CoordinatorStreamSystemFactory.scala |  50 ++
 .../scala/org/apache/samza/job/JobRunner.scala  |  77 +--
 .../samza/job/local/ProcessJobFactory.scala     |   9 +-
 .../samza/job/local/ThreadJobFactory.scala      |   5 +-
 .../apache/samza/serializers/JsonSerde.scala    |  30 +-
 .../filereader/FileReaderSystemAdmin.scala      |   8 +-
 .../main/scala/org/apache/samza/util/Util.scala | 131 ++++-
 .../MockCoordinatorStreamSystemFactory.java     | 118 +++++
 .../MockCoordinatorStreamWrappedConsumer.java   | 128 +++++
 .../stream/TestCoordinatorStreamMessage.java    |  66 +++
 .../TestCoordinatorStreamSystemConsumer.java    | 133 ++++++
 .../TestCoordinatorStreamSystemProducer.java    | 153 ++++++
 .../model/TestSamzaObjectMapper.java            |   6 +-
 samza-core/src/test/resources/test.properties   |   1 +
 .../samza/checkpoint/TestCheckpointTool.scala   |  20 +-
 .../samza/checkpoint/TestOffsetManager.scala    |  50 +-
 .../file/TestFileSystemCheckpointManager.scala  |  86 ----
 .../samza/container/TestSamzaContainer.scala    |  21 +-
 .../task/TestGroupByContainerCount.scala        |   2 +-
 .../samza/coordinator/TestJobCoordinator.scala  | 238 ++++++++--
 .../coordinator/server/TestHttpServer.scala     |   2 +-
 .../apache/samza/job/local/TestProcessJob.scala |   2 +-
 .../samza/serializers/TestJsonSerde.scala       |   4 +-
 .../kafka/KafkaCheckpointLogKey.scala           | 186 --------
 .../kafka/KafkaCheckpointManager.scala          | 427 -----------------
 .../kafka/KafkaCheckpointManagerFactory.scala   | 116 -----
 .../org/apache/samza/config/KafkaConfig.scala   |  10 -
 .../samza/config/RegExTopicGenerator.scala      |   2 +-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  59 ++-
 .../samza/system/kafka/KafkaSystemFactory.scala |  23 +-
 .../kafka/TestKafkaCheckpointLogKey.scala       |  71 ---
 .../kafka/TestKafkaCheckpointManager.scala      | 211 --------
 .../system/kafka/TestKafkaSystemAdmin.scala     |  26 +-
 .../samza/logging/log4j/StreamAppender.java     |   7 +-
 .../src/main/config/join/common.properties      |   8 +-
 .../src/main/config/negate-number.properties    |   3 +
 .../perf/container-performance.properties       |  12 +-
 .../kafka-read-write-performance.properties     |   3 +
 .../samza/system/mock/MockSystemAdmin.java      |  18 +-
 .../src/main/python/samza_failure_testing.py    |   2 +
 .../test/performance/TestPerformanceTask.scala  |   4 +-
 .../test/integration/TestStatefulTask.scala     |  17 +-
 .../org/apache/samza/config/YarnConfig.scala    |   3 -
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |  12 +-
 .../samza/job/yarn/SamzaAppMasterState.scala    |   3 +-
 .../job/yarn/SamzaAppMasterTaskManager.scala    |  12 +-
 .../org/apache/samza/job/yarn/YarnJob.scala     |   9 +-
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  |   8 +-
 .../job/yarn/TestSamzaAppMasterService.scala    |  15 +-
 .../yarn/TestSamzaAppMasterTaskManager.scala    |  42 +-
 78 files changed, 2815 insertions(+), 1834 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 97de3a2..ebad6eb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -123,7 +123,9 @@ project(':samza-api') {
 
 project(":samza-core_$scalaVersion") {
   apply plugin: 'scala'
-
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.main.java.srcDirs = []
   jar {
     manifest {
       attributes("Implementation-Version": "$version")
@@ -239,6 +241,7 @@ project(":samza-yarn_$scalaVersion") {
     compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
   }
 
   repositories {
@@ -384,6 +387,7 @@ project(":samza-test_$scalaVersion") {
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
     testCompile "com.101tec:zkclient:$zkClientVersion"
     testCompile project(":samza-kafka_$scalaVersion")
+    testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
deleted file mode 100644
index 593d118..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
-
-  /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
-   */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
-  }
-
-  /**
-   * Gets a unmodifiable view of the current Samza stream offsets.
-   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
-   */
-  public Map<SystemStreamPartition, String> getOffsets() {
-    return Collections.unmodifiableMap(offsets);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof Checkpoint)) return false;
-
-    Checkpoint that = (Checkpoint) o;
-
-    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    return offsets != null ? offsets.hashCode() : 0;
-  }
-
-  @Override
-  public String toString() {
-    return "Checkpoint [offsets=" + offsets + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
deleted file mode 100644
index 092cb91..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.container.TaskName;
-
-import java.util.Map;
-
-/**
- * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some
- * implementation-specific location.
- */
-public interface CheckpointManager {
-  public void start();
-
-  /**
-   * Registers this manager to write checkpoints of a specific Samza stream partition.
-   * @param taskName Specific Samza taskName of which to write checkpoints for.
-   */
-  public void register(TaskName taskName);
-
-  /**
-   * Writes a checkpoint based on the current state of a Samza stream partition.
-   * @param taskName Specific Samza taskName of which to write a checkpoint of.
-   * @param checkpoint Reference to a Checkpoint object to store offset data in.
-   */
-  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint);
-
-  /**
-   * Returns the last recorded checkpoint for a specified taskName.
-   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
-   * @return A Checkpoint object with the recorded offset data of the specified partition.
-   */
-  public Checkpoint readLastCheckpoint(TaskName taskName);
-
-  /**
-   * Read the taskName to partition mapping that is being maintained by this CheckpointManager
-   *
-   * @return TaskName to task log partition mapping, or an empty map if there were no messages.
-   */
-  public Map<TaskName, Integer> readChangeLogPartitionMapping();
-
-  /**
-   * Write the taskName to partition mapping that is being maintained by this CheckpointManager
-   *
-   * @param mapping Each TaskName's partition within the changelog
-   */
-  public void writeChangeLogPartitionMapping(Map<TaskName, Integer> mapping);
-
-  public void stop();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
deleted file mode 100644
index a97ff09..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-
-/**
- * Build a {@link org.apache.samza.checkpoint.CheckpointManager}.
- */
-public interface CheckpointManagerFactory {
-  public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/container/TaskName.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/TaskName.java b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
index 0833586..aa19293 100644
--- a/samza-api/src/main/java/org/apache/samza/container/TaskName.java
+++ b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
@@ -21,7 +21,9 @@ package org.apache.samza.container;
 /**
  * A unique identifier of a set of a SystemStreamPartitions that have been grouped by
  * a {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}.  The
- * SystemStreamPartitionGrouper determines the TaskName for each set it creates.
+ * SystemStreamPartitionGrouper determines the TaskName for each set it creates. The TaskName class
+ * should only contain the taskName. This is necessary because the ChangelogManager assumes that the taskName is
+ * unique enough to identify this class, and uses it to store it in the underlying coordinator stream (as a key).
  */
 public class TaskName implements Comparable<TaskName> {
   private final String taskName;

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 8995ba3..7a588eb 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -23,10 +23,9 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * Helper interface attached to an underlying system to fetch
- * information about streams, partitions, offsets, etc. This interface is useful
- * for providing utility methods that Samza needs in order to interact with a
- * system.
+ * Helper interface attached to an underlying system to fetch information about
+ * streams, partitions, offsets, etc. This interface is useful for providing
+ * utility methods that Samza needs in order to interact with a system.
  */
 public interface SystemAdmin {
 
@@ -51,10 +50,22 @@ public interface SystemAdmin {
    */
   Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
 
-    /**
-     * An API to create a change log stream
-     * @param streamName The name of the stream to be created in the underlying stream
-     * @param numOfPartitions The number of partitions in the changelog stream
-     */
+  /**
+   * An API to create a change log stream
+   * 
+   * @param streamName
+   *          The name of the stream to be created in the underlying stream
+   * @param numOfPartitions
+   *          The number of partitions in the changelog stream
+   */
   void createChangelogStream(String streamName, int numOfPartitions);
+
+  /**
+   * Create a stream for the job coordinator. If the stream already exists, this
+   * call should simply return.
+   * 
+   * @param streamName
+   *          The name of the coordinator stream to create.
+   */
+  void createCoordinatorStream(String streamName);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
index 6ff1a55..e7bf6f1 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
@@ -22,7 +22,7 @@ package org.apache.samza.task;
 /**
  * TaskCoordinators are provided to the process methods of {@link org.apache.samza.task.StreamTask} implementations
  * to allow the user code to request actions from the Samza framework, including committing the current checkpoints
- * to configured {@link org.apache.samza.checkpoint.CheckpointManager}s or shutting down the task or all tasks within
+ * to configured org.apache.samza.checkpoint.CheckpointManager or shutting down the task or all tasks within
  * a container.
  * <p>
  *   This interface may evolve over time.

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 01997ae..a6b14fb 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -69,4 +69,9 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
 
     return offsetsAfter;
   }
+
+  @Override
+  public void createCoordinatorStream(String streamName) {
+    throw new UnsupportedOperationException("Single partition admin can't create coordinator streams.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
new file mode 100644
index 0000000..593d118
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+public class Checkpoint {
+  private final Map<SystemStreamPartition, String> offsets;
+
+  /**
+   * Constructs a new checkpoint based off a map of Samza stream offsets.
+   * @param offsets Map of Samza streams to their current offset.
+   */
+  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
+    this.offsets = offsets;
+  }
+
+  /**
+   * Gets a unmodifiable view of the current Samza stream offsets.
+   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+   */
+  public Map<SystemStreamPartition, String> getOffsets() {
+    return Collections.unmodifiableMap(offsets);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof Checkpoint)) return false;
+
+    Checkpoint that = (Checkpoint) o;
+
+    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return offsets != null ? offsets.hashCode() : 0;
+  }
+
+  @Override
+  public String toString() {
+    return "Checkpoint [offsets=" + offsets + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
new file mode 100644
index 0000000..3ac63ca
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetCheckpoint;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The CheckpointManager is used to persist and restore checkpoint information. The CheckpointManager uses
+ * CoordinatorStream underneath to do this.
+ */
+public class CheckpointManager {
+
+  private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
+  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+  private final Map<TaskName, Checkpoint> taskNamesToOffsets;
+  private final HashSet<TaskName> taskNames;
+  private String source;
+
+  public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+      CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+    this.coordinatorStreamProducer = coordinatorStreamProducer;
+    taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
+    taskNames = new HashSet<TaskName>();
+    this.source = "Unknown";
+  }
+
+  public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+      CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
+      String source) {
+    this(coordinatorStreamProducer, coordinatorStreamConsumer);
+    this.source = source;
+  }
+
+  public void start() {
+    coordinatorStreamProducer.start();
+    coordinatorStreamConsumer.start();
+  }
+
+  /**
+   * Registers this manager to write checkpoints of a specific Samza stream partition.
+   * @param taskName Specific Samza taskName of which to write checkpoints for.
+   */
+  public void register(TaskName taskName) {
+    log.debug("Adding taskName {} to {}", taskName, this);
+    taskNames.add(taskName);
+    coordinatorStreamConsumer.register();
+    coordinatorStreamProducer.register(taskName.getTaskName());
+  }
+
+  /**
+   * Writes a checkpoint based on the current state of a Samza stream partition.
+   * @param taskName Specific Samza taskName of which to write a checkpoint of.
+   * @param checkpoint Reference to a Checkpoint object to store offset data in.
+   */
+  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
+    log.debug("Writing checkpoint for Task: {} with offsets: {}", taskName.getTaskName(), checkpoint.getOffsets());
+    SetCheckpoint checkPointMessage = new SetCheckpoint(source, taskName.getTaskName(), checkpoint);
+    coordinatorStreamProducer.send(checkPointMessage);
+  }
+
+  /**
+   * Returns the last recorded checkpoint for a specified taskName.
+   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+   * @return A Checkpoint object with the recorded offset data of the specified partition.
+   */
+  public Checkpoint readLastCheckpoint(TaskName taskName) {
+    // Bootstrap each time to make sure that we are caught up with the stream, the bootstrap will just catch up on consecutive calls
+    log.debug("Reading checkpoint for Task: {}", taskName.getTaskName());
+    Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetCheckpoint.TYPE);
+    for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+      SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage);
+      TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey());
+      if(taskNames.contains(taskNameInCheckpoint)) {
+        taskNamesToOffsets.put(taskNameInCheckpoint, setCheckpoint.getCheckpoint());
+        log.debug("Adding checkpoint {} for taskName {}", taskNameInCheckpoint, taskName);
+      }
+    }
+    return taskNamesToOffsets.get(taskName);
+  }
+
+  public void stop() {
+    coordinatorStreamConsumer.stop();
+    coordinatorStreamProducer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
new file mode 100644
index 0000000..f8b705f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Represents a message for the job coordinator. All messages in the coordinator
+ * stream must wrap the CoordinatorStreamMessage class. Coordinator stream
+ * messages are modeled as key/value pairs. The key is a list of well defined
+ * fields: version, type, and key. The value is a map. There are some
+ * pre-defined fields (such as timestamp, host, etc) for the value map, which
+ * are common to all messages.
+ * </p>
+ * 
+ * <p>
+ * The full structure for a CoordinatorStreamMessage is:
+ * </p>
+ * 
+ * <pre>
+ * key =&gt; [1, "set-config", "job.name"] 
+ * 
+ * message =&gt; {
+ *   "host": "192.168.0.1",
+ *   "username": "criccomini",
+ *   "source": "job-runner",
+ *   "timestamp": 123456789,
+ *   "values": {
+ *     "value": "my-job-name"
+ *   } 
+ * }
+ * </pre>
+ * 
+ * Where the key's structure is:
+ * 
+ * <pre>
+ * key =&gt; [&lt;version&gt;, &lt;type&gt;, &lt;key&gt;]
+ * </pre>
+ * 
+ * <p>
+ * Note that the white space in the above JSON blobs are done for legibility.
+ * Over the wire, the JSON should be compact, and no unnecessary white space
+ * should be used. This is extremely important for key serialization, since a
+ * key with [1,"set-config","job.name"] and [1, "set-config", "job.name"] will
+ * be evaluated as two different keys, and Kafka will not log compact them (if
+ * Kafka is used as the underlying system for a coordinator stream).
+ * </p>
+ * 
+ * <p>
+ * The "values" map in the message is defined on a per-message-type basis. For
+ * set-config messages, there is just a single key/value pair, where the "value"
+ * key is defined. For offset messages, there will be multiple key/values pairs
+ * in "values" (one for each SystemStreamPartition/offset pair for a given
+ * TaskName).
+ * </p>
+ * 
+ * <p>
+ * The most important fields are type, key, and values. The type field (defined
+ * as index 1 in the key list) defines the kind of message, the key (defined as
+ * index 2 in the key list) defines a key to associate with the values, and the
+ * values map defines a set of values associated with the type. A concrete
+ * example would be a config message of type "set-config" with key "job.name"
+ * and values {"value": "my-job-name"}.
+ * </p>
+ */
+public class CoordinatorStreamMessage {
+  public static int VERSION_INDEX = 0;
+  public static int TYPE_INDEX = 1;
+  public static int KEY_INDEX = 2;
+
+  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamMessage.class);
+
+  /**
+   * Protocol version for coordinator stream messages. This version number must
+   * be incremented any time new messages are added to the coordinator stream,
+   * or changes are made to the key/message headers.
+   */
+  public static final int VERSION = 1;
+
+  /**
+   * Contains all key fields. Currently, this includes the type of the message,
+   * the key associated with the type (e.g. type: set-config key: job.name), and
+   * the version of the protocol. The indices are defined as the INDEX static
+   * variables above.
+   */
+  private final Object[] keyArray;
+
+  /**
+   * Contains all fields for the message. This includes who sent the message,
+   * the host, etc. It also includes a "values" map, which contains all values
+   * associated with the key of the message. If set-config/job.name were used as
+   * the type/key of the message, then values would contain
+   * {"value":"my-job-name"}.
+   */
+  private final Map<String, Object> messageMap;
+  private boolean isDelete;
+
+  public CoordinatorStreamMessage(CoordinatorStreamMessage message) {
+    this(message.getKeyArray(), message.getMessageMap());
+  }
+
+  public CoordinatorStreamMessage(Object[] keyArray, Map<String, Object> messageMap) {
+    this.keyArray = keyArray;
+    this.messageMap = messageMap;
+    this.isDelete = messageMap == null;
+  }
+
+  public CoordinatorStreamMessage(String source) {
+    this(source, new Object[] { Integer.valueOf(VERSION), null, null }, new HashMap<String, Object>());
+  }
+
+  public CoordinatorStreamMessage(String source, Object[] keyArray, Map<String, Object> messageMap) {
+    this(keyArray, messageMap);
+    if (!isDelete) {
+      this.messageMap.put("values", new HashMap<String, String>());
+      setSource(source);
+      setUsername(System.getProperty("user.name"));
+      setTimestamp(System.currentTimeMillis());
+
+      try {
+        setHost(InetAddress.getLocalHost().getHostAddress());
+      } catch (UnknownHostException e) {
+        log.warn("Unable to retrieve host for current machine. Setting coordinator stream message host field to an empty string.");
+        setHost("");
+      }
+    }
+
+    setVersion(VERSION);
+  }
+
+  protected void setIsDelete(boolean isDelete) {
+    this.isDelete = isDelete;
+  }
+
+  protected void setHost(String host) {
+    messageMap.put("host", host);
+  }
+
+  protected void setUsername(String username) {
+    messageMap.put("username", username);
+  }
+
+  protected void setSource(String source) {
+    messageMap.put("source", source);
+  }
+
+  protected void setTimestamp(long timestamp) {
+    messageMap.put("timestamp", Long.valueOf(timestamp));
+  }
+
+  protected void setVersion(int version) {
+    this.keyArray[VERSION_INDEX] = Integer.valueOf(version);
+  }
+
+  protected void setType(String type) {
+    this.keyArray[TYPE_INDEX] = type;
+  }
+
+  protected void setKey(String key) {
+    this.keyArray[KEY_INDEX] = key;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected Map<String, String> getMessageValues() {
+    return (Map<String, String>) this.messageMap.get("values");
+  }
+
+  protected String getMessageValue(String key) {
+    return getMessageValues().get(key);
+  }
+
+  /**
+   * @param key
+   *           The key inside the messageMap, please only use human readable string (no JSON or such) - this allows
+   *           easy mutation of the coordinator stream outside of Samza (scripts)
+   * @param value
+   *           The value corresponding to the key, should also be a simple string
+   */
+  protected void putMessageValue(String key, String value) {
+    getMessageValues().put(key, value);
+  }
+
+  /**
+   * The type of the message is used to convert a generic
+   * CoordinatorStreaMessage into a specific message, such as a SetConfig
+   * message.
+   * 
+   * @return The type of the message.
+   */
+  public String getType() {
+    return (String) this.keyArray[TYPE_INDEX];
+  }
+
+  /**
+   * @return The whole key map including both the key and type of the message.
+   */
+  public Object[] getKeyArray() {
+    return this.keyArray;
+  }
+
+  /**
+   * @return Whether the message signifies a delete or not.
+   */
+  public boolean isDelete() {
+    return isDelete;
+  }
+
+  /**
+   * @return Whether the message signifies a delete or not.
+   */
+  public String getUsername() {
+    return (String) this.messageMap.get("username");
+  }
+
+  /**
+   * @return Whether the message signifies a delete or not.
+   */
+  public long getTimestamp() {
+    return (Long) this.messageMap.get("timestamp");
+  }
+
+  /**
+   * @return The whole message map including header information.
+   */
+  public Map<String, Object> getMessageMap() {
+    if (!isDelete) {
+      Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
+      // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map.
+      immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
+      return Collections.unmodifiableMap(immutableMap);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * @return The source that sent the coordinator message. This is a string
+   *         defined by the sender.
+   */
+  public String getSource() {
+    return (String) this.messageMap.get("source");
+  }
+
+  /**
+   * @return The protocol version that the message conforms to.
+   */
+  public int getVersion() {
+    return (Integer) this.keyArray[VERSION_INDEX];
+  }
+
+  /**
+   * @return The key for a message. The key's meaning is defined by the type of
+   *         the message.
+   */
+  public String getKey() {
+    return (String) this.keyArray[KEY_INDEX];
+  }
+
+  @Override
+  public String toString() {
+    return "CoordinatorStreamMessage [keyArray=" + Arrays.toString(keyArray) + ", messageMap=" + messageMap + ", isDelete=" + isDelete + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (isDelete ? 1231 : 1237);
+    result = prime * result + Arrays.hashCode(keyArray);
+    result = prime * result + ((messageMap == null) ? 0 : messageMap.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj;
+    if (isDelete != other.isDelete)
+      return false;
+    if (!Arrays.equals(keyArray, other.keyArray))
+      return false;
+    if (messageMap == null) {
+      if (other.messageMap != null)
+        return false;
+    } else if (!messageMap.equals(other.messageMap))
+      return false;
+    return true;
+  }
+
+  /**
+   * A coordinator stream message that tells the job coordinator to set a
+   * specific configuration.
+   */
+  public static class SetConfig extends CoordinatorStreamMessage {
+    public static final String TYPE = "set-config";
+
+    public SetConfig(CoordinatorStreamMessage message) {
+      super(message.getKeyArray(), message.getMessageMap());
+    }
+
+    public SetConfig(String source, String key, String value) {
+      super(source);
+      setType(TYPE);
+      setKey(key);
+      putMessageValue("value", value);
+    }
+
+    public String getConfigValue() {
+      return (String) getMessageValue("value");
+    }
+  }
+
+  public static class Delete extends CoordinatorStreamMessage {
+    public Delete(String source, String key, String type) {
+      this(source, key, type, VERSION);
+    }
+
+    /**
+     * <p>
+     * Delete messages must take the type of another CoordinatorStreamMessage
+     * (e.g. SetConfig) to define the type of message that's being deleted.
+     * Considering Kafka's log compaction, for example, the keys of a message
+     * and its delete key must match exactly:
+     * </p>
+     * 
+     * <pre>
+     * k=&gt;[1,"job.name","set-config"] .. v=&gt; {..some stuff..}
+     * v=&gt;[1,"job.name","set-config"] .. v=&gt; null
+     * </pre>
+     * 
+     * <p>
+     * Deletes are modeled as a CoordinatorStreamMessage with a null message
+     * map, and a key that's identical to the key map that's to be deleted.
+     * </p>
+     * 
+     * @param source
+     *          The source ID of the sender of the delete message.
+     * @param key
+     *          The key to delete.
+     * @param type
+     *          The type of message to delete. Must correspond to one of hte
+     *          other CoordinatorStreamMessages.
+     * @param version
+     *          The protocol version.
+     */
+    public Delete(String source, String key, String type, int version) {
+      super(source);
+      setType(type);
+      setKey(key);
+      setVersion(version);
+      setIsDelete(true);
+    }
+  }
+
+  /**
+   * The SetCheckpoint is used to store the checkpoint messages for a particular task.
+   * The structure looks like:
+   * {
+   * Key: TaskName
+   * Type: set-checkpoint
+   * Source: ContainerID
+   * MessageMap:
+   *  {
+   *     SSP1 : offset,
+   *     SSP2 : offset
+   *  }
+   * }
+   */
+  public static class SetCheckpoint extends CoordinatorStreamMessage {
+    public static final String TYPE = "set-checkpoint";
+
+    public SetCheckpoint(CoordinatorStreamMessage message) {
+      super(message.getKeyArray(), message.getMessageMap());
+    }
+
+    /**
+     *
+     * @param source The source writing the checkpoint
+     * @param key The key for the checkpoint message (Typically task name)
+     * @param checkpoint Checkpoint message to be written to the stream
+     */
+    public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
+      super(source);
+      setType(TYPE);
+      setKey(key);
+      Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
+      for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
+        putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
+      }
+    }
+
+    public Checkpoint getCheckpoint() {
+      Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
+      for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
+        offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
+      }
+      return new Checkpoint(offsetMap);
+    }
+  }
+
+  /**
+   * The SetChanglog is used to store the changelog parition information for a particular task.
+   * The structure looks like:
+   * {
+   * Key: TaskName
+   * Type: set-changelog
+   * Source: ContainerID
+   * MessageMap:
+   *  {
+   *     "Partition" : partitionNumber (They key is just a dummy key here, the value contains the actual partition)
+   *  }
+   * }
+   */
+  public static class SetChangelogMapping extends CoordinatorStreamMessage {
+    public static final String TYPE = "set-changelog";
+
+    public SetChangelogMapping(CoordinatorStreamMessage message) {
+      super(message.getKeyArray(), message.getMessageMap());
+    }
+
+    /**
+     *
+     * @param source Source writing the change log mapping
+     * @param taskName The task name to be used in the mapping
+     * @param changelogPartitionNumber The partition to which the task's changelog is mapped to
+     */
+    public SetChangelogMapping(String source, String taskName, int changelogPartitionNumber) {
+      super(source);
+      setType(TYPE);
+      setKey(taskName);
+      putMessageValue("Partition", String.valueOf(changelogPartitionNumber));
+    }
+
+    public String getTaskName() {
+      return getKey();
+    }
+
+    public int getPartition() {
+      return Integer.parseInt(getMessageValue("Partition"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
new file mode 100644
index 0000000..2134603
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper around a SystemConsumer that provides helpful methods for dealing
+ * with the coordinator stream.
+ */
+public class CoordinatorStreamSystemConsumer {
+  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemConsumer.class);
+
+  private final Serde<List<?>> keySerde;
+  private final Serde<Map<String, Object>> messageSerde;
+  private final SystemStreamPartition coordinatorSystemStreamPartition;
+  private final SystemConsumer systemConsumer;
+  private final SystemAdmin systemAdmin;
+  private final Map<String, String> configMap;
+  private boolean isBootstrapped;
+  private boolean isStarted;
+  private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new HashSet<CoordinatorStreamMessage>();
+
+  public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+    this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
+    this.systemConsumer = systemConsumer;
+    this.systemAdmin = systemAdmin;
+    this.configMap = new HashMap<String, String>();
+    this.isBootstrapped = false;
+    this.keySerde = keySerde;
+    this.messageSerde = messageSerde;
+  }
+
+  public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
+    this(coordinatorSystemStream, systemConsumer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+  }
+
+  /**
+   * Retrieves the oldest offset in the coordinator stream, and registers the
+   * coordinator stream with the SystemConsumer using the earliest offset.
+   */
+  public void register() {
+    log.debug("Attempting to register: {}", coordinatorSystemStreamPartition);
+    Set<String> streamNames = new HashSet<String>();
+    String streamName = coordinatorSystemStreamPartition.getStream();
+    streamNames.add(streamName);
+    Map<String, SystemStreamMetadata> systemStreamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames);
+
+    if (systemStreamMetadataMap == null) {
+      throw new SamzaException("Received a null systemStreamMetadataMap from the systemAdmin. This is illegal.");
+    }
+
+    SystemStreamMetadata systemStreamMetadata = systemStreamMetadataMap.get(streamName);
+
+    if (systemStreamMetadata == null) {
+      throw new SamzaException("Expected " + streamName + " to be in system stream metadata.");
+    }
+
+    SystemStreamPartitionMetadata systemStreamPartitionMetadata = systemStreamMetadata.getSystemStreamPartitionMetadata().get(coordinatorSystemStreamPartition.getPartition());
+
+    if (systemStreamPartitionMetadata == null) {
+      throw new SamzaException("Expected metadata for " + coordinatorSystemStreamPartition + " to exist.");
+    }
+
+    String startingOffset = systemStreamPartitionMetadata.getOldestOffset();
+    log.debug("Registering {} with offset {}", coordinatorSystemStreamPartition, startingOffset);
+    systemConsumer.register(coordinatorSystemStreamPartition, startingOffset);
+  }
+
+  /**
+   * Starts the underlying SystemConsumer.
+   */
+  public void start() {
+    if(isStarted)
+    {
+      log.info("Coordinator stream consumer already started");
+      return;
+    }
+    log.info("Starting coordinator stream system consumer.");
+    systemConsumer.start();
+    isStarted = true;
+  }
+
+  /**
+   * Stops the underlying SystemConsumer.
+   */
+  public void stop() {
+    log.info("Stopping coordinator stream system consumer.");
+    systemConsumer.stop();
+  }
+
+  /**
+   * Read all messages from the earliest offset, all the way to the latest.
+   * Currently, this method only pays attention to config messages.
+   */
+  public void bootstrap() {
+    log.info("Bootstrapping configuration from coordinator stream.");
+    SystemStreamPartitionIterator iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
+
+    try {
+      while (iterator.hasNext()) {
+        IncomingMessageEnvelope envelope = iterator.next();
+        Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray();
+        Map<String, Object> valueMap = null;
+        if (envelope.getMessage() != null) {
+          valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
+        }
+        CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
+        log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+        bootstrappedStreamSet.add(coordinatorStreamMessage);
+        if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
+          String configKey = coordinatorStreamMessage.getKey();
+          if (coordinatorStreamMessage.isDelete()) {
+            configMap.remove(configKey);
+          } else {
+            String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
+            configMap.put(configKey, configValue);
+          }
+        }
+      }
+      log.debug("Bootstrapped configuration: {}", configMap);
+      isBootstrapped = true;
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  public Set<CoordinatorStreamMessage> getBoostrappedStream() {
+    log.info("Returning the bootstrapped data from the stream");
+    if(!isBootstrapped)
+      bootstrap();
+    return bootstrappedStreamSet;
+  }
+
+  public Set<CoordinatorStreamMessage> getBootstrappedStream(String type) {
+    log.debug("Bootstrapping coordinator stream for messages of type {}", type);
+    bootstrap();
+    HashSet<CoordinatorStreamMessage> bootstrappedStream = new HashSet<CoordinatorStreamMessage>();
+    for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) {
+      if(type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
+        bootstrappedStream.add(coordinatorStreamMessage);
+      }
+    }
+    return bootstrappedStream;
+  }
+
+  /**
+   * @return The bootstrapped configuration that's been read after bootstrap has
+   *         been invoked.
+   */
+  public Config getConfig() {
+    if (isBootstrapped) {
+      return new MapConfig(configMap);
+    } else {
+      throw new SamzaException("Must call bootstrap before retrieving config.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
new file mode 100644
index 0000000..0f3e10e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper around a SystemProducer that provides helpful methods for dealing
+ * with the coordinator stream.
+ */
+public class CoordinatorStreamSystemProducer {
+  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemProducer.class);
+
+  private final Serde<List<?>> keySerde;
+  private final Serde<Map<String, Object>> messageSerde;
+  private final SystemStream systemStream;
+  private final SystemProducer systemProducer;
+  private final SystemAdmin systemAdmin;
+  private boolean isStarted;
+
+  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) {
+    this(systemStream, systemProducer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+  }
+
+  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+    this.systemStream = systemStream;
+    this.systemProducer = systemProducer;
+    this.systemAdmin = systemAdmin;
+    this.keySerde = keySerde;
+    this.messageSerde = messageSerde;
+  }
+
+  /**
+   * Registers a source with the underlying SystemProducer.
+   * 
+   * @param source
+   *          The source to register.
+   */
+  public void register(String source) {
+    systemProducer.register(source);
+  }
+
+  /**
+   * Creates the coordinator stream, and starts the system producer.
+   */
+  public void start() {
+    if(isStarted)
+    {
+      log.info("Coordinator stream producer already started");
+      return;
+    }
+    log.info("Starting coordinator stream producer.");
+    systemProducer.start();
+    isStarted = true;
+  }
+
+  /**
+   * Stops the underlying SystemProducer.
+   */
+  public void stop() {
+    log.info("Stopping coordinator stream producer.");
+    systemProducer.stop();
+  }
+
+  /**
+   * Serialize and send a coordinator stream message.
+   * 
+   * @param message
+   *          The message to send.
+   */
+  public void send(CoordinatorStreamMessage message) {
+    log.debug("Sending {}", message);
+    try {
+      String source = message.getSource();
+      byte[] key = keySerde.toBytes(Arrays.asList(message.getKeyArray()));
+      byte[] value = null;
+      if (!message.isDelete()) {
+        value = messageSerde.toBytes(message.getMessageMap());
+      }
+      OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(systemStream, Integer.valueOf(0), key, value);
+      systemProducer.send(source, envelope);
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  /**
+   * Helper method that sends a series of SetConfig messages to the coordinator
+   * stream.
+   * 
+   * @param source
+   *          An identifier to denote which source is sending a message. This
+   *          can be any arbitrary string.
+   * @param config
+   *          The config object to store in the coordinator stream.
+   */
+  public void writeConfig(String source, Config config) {
+    log.debug("Writing config: {}", config);
+    for (Map.Entry<String, String> configPair : config.entrySet()) {
+      send(new CoordinatorStreamMessage.SetConfig(source, configPair.getKey(), configPair.getValue()));
+    }
+    systemProducer.flush(source);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
index eb22d2e..c26690a 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -20,11 +20,13 @@
 package org.apache.samza.job.model;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
+
 /**
  * <p>
  * The data model used to represent a task. The model is used in the job
@@ -39,12 +41,12 @@ import org.apache.samza.system.SystemStreamPartition;
  */
 public class TaskModel implements Comparable<TaskModel> {
   private final TaskName taskName;
-  private final Set<SystemStreamPartition> systemStreamPartitions;
+  private final Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets;
   private final Partition changelogPartition;
 
-  public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+  public TaskModel(TaskName taskName, Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, Partition changelogPartition) {
     this.taskName = taskName;
-    this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
+    this.systemStreamPartitionsToOffsets = Collections.unmodifiableMap(systemStreamPartitionsToOffsets);
     this.changelogPartition = changelogPartition;
   }
 
@@ -53,55 +55,55 @@ public class TaskModel implements Comparable<TaskModel> {
   }
 
   public Set<SystemStreamPartition> getSystemStreamPartitions() {
-    return systemStreamPartitions;
+    return systemStreamPartitionsToOffsets.keySet();
   }
 
   public Partition getChangelogPartition() {
     return changelogPartition;
   }
 
-  @Override
-  public String toString() {
-    return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
+  public Map<SystemStreamPartition, String> getCheckpointedOffsets() {
+    return systemStreamPartitionsToOffsets;
   }
 
   @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((changelogPartition == null) ? 0 : changelogPartition.hashCode());
-    result = prime * result + ((systemStreamPartitions == null) ? 0 : systemStreamPartitions.hashCode());
-    result = prime * result + ((taskName == null) ? 0 : taskName.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
+  public boolean equals(Object o) {
+    if (this == o) {
       return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
+    }
+    if (o == null || getClass() != o.getClass()) {
       return false;
-    TaskModel other = (TaskModel) obj;
-    if (changelogPartition == null) {
-      if (other.changelogPartition != null)
-        return false;
-    } else if (!changelogPartition.equals(other.changelogPartition))
+    }
+
+    TaskModel taskModel = (TaskModel) o;
+
+    if (!changelogPartition.equals(taskModel.changelogPartition)) {
       return false;
-    if (systemStreamPartitions == null) {
-      if (other.systemStreamPartitions != null)
-        return false;
-    } else if (!systemStreamPartitions.equals(other.systemStreamPartitions))
+    }
+    if (!systemStreamPartitionsToOffsets.equals(taskModel.systemStreamPartitionsToOffsets)) {
       return false;
-    if (taskName == null) {
-      if (other.taskName != null)
-        return false;
-    } else if (!taskName.equals(other.taskName))
+    }
+    if (!taskName.equals(taskModel.taskName)) {
       return false;
+    }
+
     return true;
   }
 
+  @Override
+  public int hashCode() {
+    int result = taskName.hashCode();
+    result = 31 * result + systemStreamPartitionsToOffsets.hashCode();
+    result = 31 * result + changelogPartition.hashCode();
+    return result;
+  }
+
+  @Override
+
+  public String toString() {
+    return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitionsToOffsets.keySet() + ", changeLogPartition=" + changelogPartition + "]";
+  }
+
   public int compareTo(TaskModel other) {
     return taskName.compareTo(other.getTaskName());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
index 7dc431c..172358a 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.serializers.model;
 
-import java.util.Set;
+import java.util.Map;
 import org.apache.samza.Partition;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
@@ -31,14 +31,14 @@ import org.codehaus.jackson.annotate.JsonProperty;
  */
 public abstract class JsonTaskModelMixIn {
   @JsonCreator
-  public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) {
+  public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions-offsets") Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, @JsonProperty("changelog-partition") Partition changelogPartition) {
   }
 
   @JsonProperty("task-name")
   abstract TaskName getTaskName();
 
-  @JsonProperty("system-stream-partitions")
-  abstract Set<SystemStreamPartition> getSystemStreamPartitions();
+  @JsonProperty("system-stream-partitions-offsets")
+  abstract Map<SystemStreamPartition, String> getCheckpointedOffsets();
 
   @JsonProperty("changelog-partition")
   abstract Partition getChangelogPartition();

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 3517912..17410c5 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -29,7 +29,9 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParser;
@@ -39,9 +41,11 @@ import org.codehaus.jackson.Version;
 import org.codehaus.jackson.map.DeserializationContext;
 import org.codehaus.jackson.map.JsonDeserializer;
 import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.KeyDeserializer;
 import org.codehaus.jackson.map.MapperConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.PropertyNamingStrategy;
+import org.codehaus.jackson.map.SerializationConfig;
 import org.codehaus.jackson.map.SerializerProvider;
 import org.codehaus.jackson.map.introspect.AnnotatedField;
 import org.codehaus.jackson.map.introspect.AnnotatedMethod;
@@ -76,9 +80,11 @@ public class SamzaObjectMapper {
     // Setup custom serdes for simple data types.
     module.addSerializer(Partition.class, new PartitionSerializer());
     module.addSerializer(SystemStreamPartition.class, new SystemStreamPartitionSerializer());
+    module.addKeySerializer(SystemStreamPartition.class, new SystemStreamPartitionKeySerializer());
     module.addSerializer(TaskName.class, new TaskNameSerializer());
     module.addDeserializer(Partition.class, new PartitionDeserializer());
     module.addDeserializer(SystemStreamPartition.class, new SystemStreamPartitionDeserializer());
+    module.addKeyDeserializer(SystemStreamPartition.class, new SystemStreamPartitionKeyDeserializer());
     module.addDeserializer(Config.class, new ConfigDeserializer());
 
     // Setup mixins for data models.
@@ -88,6 +94,7 @@ public class SamzaObjectMapper {
     mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
     mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
     mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+    mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, false);
 
     // Convert camel case to hyphenated field names, and register the module.
     mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
@@ -138,6 +145,23 @@ public class SamzaObjectMapper {
     }
   }
 
+  public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
+    @Override
+    public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jgen, SerializerProvider provider)
+        throws IOException {
+      String ssp = Util.sspToString(systemStreamPartition);
+      jgen.writeFieldName(ssp);
+    }
+  }
+
+  public static class SystemStreamPartitionKeyDeserializer extends KeyDeserializer {
+    @Override
+    public Object deserializeKey(String sspString, DeserializationContext ctxt)
+        throws IOException {
+      return Util.stringToSsp(sspString);
+    }
+  }
+
   public static class SystemStreamPartitionSerializer extends JsonSerializer<SystemStreamPartition> {
     @Override
     public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
new file mode 100644
index 0000000..fff7634
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The Changelog manager is used to persist and read the changelog information from the coordinator stream.
+ */
+public class ChangelogPartitionManager {
+
+  private static final Logger log = LoggerFactory.getLogger(ChangelogPartitionManager.class);
+  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+  private boolean isCoordinatorConsumerRegistered = false;
+  private String source;
+
+  public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+      CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+    this.coordinatorStreamProducer = coordinatorStreamProducer;
+    this.source = "Unknown";
+  }
+
+  public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+      CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
+      String source) {
+    this(coordinatorStreamProducer, coordinatorStreamConsumer);
+    this.source = source;
+  }
+
+  public void start() {
+    coordinatorStreamProducer.start();
+    coordinatorStreamConsumer.start();
+  }
+
+  public void stop() {
+    coordinatorStreamConsumer.stop();
+    coordinatorStreamProducer.stop();
+  }
+
+  /**
+   * Registers this manager to write changelog mapping for a particular task.
+   * @param taskName The taskname to be registered for changelog mapping.
+   */
+  public void register(TaskName taskName) {
+    log.debug("Adding taskName {} to {}", taskName, this);
+    if(!isCoordinatorConsumerRegistered) {
+      coordinatorStreamConsumer.register();
+      isCoordinatorConsumerRegistered = true;
+    }
+    coordinatorStreamProducer.register(taskName.getTaskName());
+  }
+
+  /**
+   * Read the taskName to partition mapping that is being maintained by this ChangelogManager
+   * @return TaskName to change log partition mapping, or an empty map if there were no messages.
+   */
+  public Map<TaskName, Integer> readChangeLogPartitionMapping() {
+    log.debug("Reading changelog partition information");
+    Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetChangelogMapping.TYPE);
+    HashMap<TaskName, Integer> changelogMapping = new HashMap<TaskName, Integer>();
+    for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+      SetChangelogMapping changelogMapEntry = new SetChangelogMapping(coordinatorStreamMessage);
+      changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), changelogMapEntry.getPartition());
+      log.debug("TaskName: {} is mapped to {}", changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
+    }
+    return changelogMapping;
+  }
+
+  /**
+   * Write the taskName to partition mapping that is being maintained by this ChangelogManager
+   * @param changelogEntries The entries that needs to be written to the coordinator stream, the map takes the taskName
+   *                       and it's corresponding changelog partition.
+   */
+  public void writeChangeLogPartitionMapping(Map<TaskName, Integer> changelogEntries) {
+    log.debug("Updating changelog information with: ");
+    for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
+      log.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), entry.getValue());
+      SetChangelogMapping changelogMapping = new SetChangelogMapping(source,
+          entry.getKey().getTaskName(),
+          entry.getValue());
+      coordinatorStreamProducer.send(changelogMapping);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index ddc30af..2e3aeb8 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -26,14 +26,18 @@ import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config.{Config, StreamConfig}
 import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{CommandLine, Util}
+import org.apache.samza.util.CommandLine
 import org.apache.samza.{Partition, SamzaException}
 import scala.collection.JavaConversions._
 import org.apache.samza.util.Logging
 import org.apache.samza.coordinator.JobCoordinator
 
+import scala.collection.immutable.HashMap
+
+
 /**
  * Command-line tool for inspecting and manipulating the checkpoints for a job.
  * This can be used, for example, to force a job to re-process a stream from the
@@ -113,31 +117,30 @@ object CheckpointTool {
     }
   }
 
+  def apply(config: Config, offsets: TaskNameToCheckpointMap) = {
+    val factory = new CoordinatorStreamSystemFactory
+    val coordinatorStreamConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap())
+    val coordinatorStreamProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap())
+    val manager = new CheckpointManager(coordinatorStreamProducer, coordinatorStreamConsumer, "checkpoint-tool")
+    new CheckpointTool(config, offsets, manager)
+  }
+
   def main(args: Array[String]) {
     val cmdline = new CheckpointToolCommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
-    val tool = new CheckpointTool(config, cmdline.newOffsets)
+    val tool = CheckpointTool(config, cmdline.newOffsets)
     tool.run
   }
 }
 
-class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extends Logging {
-  val manager = config.getCheckpointManagerFactory match {
-    case Some(className) =>
-      Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap)
-    case _ =>
-      throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).")
-  }
-
-  // The CheckpointManagerFactory needs to perform this same operation when initializing
-  // the manager. TODO figure out some way of avoiding duplicated work.
+class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manager: CheckpointManager) extends Logging {
 
   def run {
     info("Using %s" format manager)
 
     // Find all the TaskNames that would be generated for this job config
-    val coordinator = JobCoordinator(config, 1)
+    val coordinator = JobCoordinator(config)
     val taskNames = coordinator
       .jobModel
       .getContainers
@@ -165,7 +168,10 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extend
 
   /** Load the most recent checkpoint state for all a specified TaskName. */
   def readLastCheckpoint(taskName:TaskName): Map[SystemStreamPartition, String] = {
-    manager.readLastCheckpoint(taskName).getOffsets.toMap
+    Option(manager.readLastCheckpoint(taskName))
+            .getOrElse(new Checkpoint(new HashMap[SystemStreamPartition, String]()))
+            .getOffsets
+            .toMap
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 85c1749..20e5d26 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -72,8 +72,8 @@ object OffsetManager extends Logging {
     config: Config,
     checkpointManager: CheckpointManager = null,
     systemAdmins: Map[String, SystemAdmin] = Map(),
-    offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics) = {
-
+    offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+    latestOffsets: Map[SystemStreamPartition, String] = Map()) = {
     debug("Building offset manager for %s." format systemStreamMetadata)
 
     val offsetSettings = systemStreamMetadata
@@ -99,7 +99,7 @@ object OffsetManager extends Logging {
           // Build OffsetSetting so we can create a map for OffsetManager.
           (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
       }.toMap
-    new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics)
+    new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics, latestOffsets)
   }
 }
 
@@ -142,12 +142,19 @@ class OffsetManager(
   /**
    * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
    */
-  val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics ) extends Logging {
+  val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+
+  /*
+   * The previously read checkpoints restored from the coordinator stream
+   */
+  val previousCheckpointedOffsets: Map[SystemStreamPartition, String] = Map()
+  ) extends Logging {
 
   /**
    * Last offsets processed for each SystemStreamPartition.
    */
-  var lastProcessedOffsets = Map[SystemStreamPartition, String]()
+  // Filter out null offset values, we can't use them, these exist only because of SSP information
+  var lastProcessedOffsets = previousCheckpointedOffsets.filter(_._2 != null)
 
   /**
    * Offsets to start reading from for each SystemStreamPartition. This
@@ -170,7 +177,8 @@ class OffsetManager(
 
   def start {
     registerCheckpointManager
-    loadOffsetsFromCheckpointManager
+    initializeCheckpointManager
+    loadOffsets
     stripResetStreams
     loadStartingOffsets
     loadDefaults
@@ -240,18 +248,21 @@ class OffsetManager(
     }
   }
 
+  private def initializeCheckpointManager {
+    if (checkpointManager != null) {
+      checkpointManager.start
+    } else {
+      debug("Skipping offset load from checkpoint manager because no manager was defined.")
+    }
+  }
+
   /**
    * Loads last processed offsets from checkpoint manager for all registered
    * partitions.
    */
-  private def loadOffsetsFromCheckpointManager {
-    if (checkpointManager != null) {
-      debug("Loading offsets from checkpoint manager.")
-
-      checkpointManager.start
-
-      lastProcessedOffsets ++= systemStreamPartitions.keys
-        .flatMap(restoreOffsetsFromCheckpoint(_)).filter {
+  private def loadOffsets {
+    debug("Loading offsets")
+    lastProcessedOffsets.filter {
           case (systemStreamPartition, offset) =>
             val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
             if (!shouldKeep) {
@@ -260,26 +271,7 @@ class OffsetManager(
             info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
             shouldKeep
         }
-    } else {
-      debug("Skipping offset load from checkpoint manager because no manager was defined.")
-    }
-  }
-
-  /**
-   * Loads last processed offsets for a single taskName.
-   */
-  private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[SystemStreamPartition, String] = {
-    debug("Loading checkpoints for taskName: %s." format taskName)
-
-    val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
-    if (checkpoint != null) {
-      checkpoint.getOffsets.toMap
-    } else {
-      info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
-
-      Map()
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
deleted file mode 100644
index 2a87a6e..0000000
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.file
-
-import java.io.File
-import java.io.FileNotFoundException
-import java.io.FileOutputStream
-import java.util
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.container.TaskName
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.serializers.CheckpointSerde
-import scala.io.Source
-
-class FileSystemCheckpointManager(
-  jobName: String,
-  root: File,
-  serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager {
-
-  override def register(taskName: TaskName):Unit = Unit
-
-  def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints")
-
-  def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
-    val bytes = serde.toBytes(checkpoint)
-    val fos = new FileOutputStream(getCheckpointFile(taskName))
-
-    fos.write(bytes)
-    fos.close
-  }
-
-  def readLastCheckpoint(taskName: TaskName): Checkpoint = {
-    try {
-      val bytes = Source.fromFile(getCheckpointFile(taskName)).map(_.toByte).toArray
-
-      serde.fromBytes(bytes)
-    } catch {
-      case e: FileNotFoundException => null
-    }
-  }
-
-  def start {
-    if (!root.exists) {
-      throw new SamzaException("Root directory for file system checkpoint manager does not exist: %s" format root)
-    }
-  }
-
-  def stop {}
-
-  private def getFile(jobName: String, taskName: TaskName, fileType:String) =
-    new File(root, "%s-%s-%s" format (jobName, taskName, fileType))
-
-  private def getChangeLogPartitionMappingFile() = getFile(jobName, new TaskName("partition-mapping"), "changelog-partition-mapping")
-
-  override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
-    try {
-      val bytes = Source.fromFile(getChangeLogPartitionMappingFile()).map(_.toByte).toArray
-      serde.changelogPartitionMappingFromBytes(bytes)
-    } catch {
-      case e: FileNotFoundException => new util.HashMap[TaskName, java.lang.Integer]()
-    }
-  }
-
-  def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = {
-    val hashmap = new util.HashMap[TaskName, java.lang.Integer](mapping)
-    val bytes = serde.changelogPartitionMappingToBytes(hashmap)
-    val fos = new FileOutputStream(getChangeLogPartitionMappingFile())
-
-    fos.write(bytes)
-    fos.close
-  }
-}
-
-class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory {
-  def getCheckpointManager(config: Config, registry: MetricsRegistry) = {
-    val name = config
-      .getName
-      .getOrElse(throw new SamzaException("Missing job name in configs"))
-    val root = config
-      .getFileSystemCheckpointRoot
-      .getOrElse(throw new SamzaException("Missing checkpoint root in configs"))
-    new FileSystemCheckpointManager(name, new File(root))
-  }
-}