You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/11/02 23:07:48 UTC

[3/3] samza git commit: SAMZA-798 : Performance and stability issue after combining checkpoint and coordinator stream

SAMZA-798 : Performance and stability issue after combining checkpoint and coordinator stream


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/eba9b28f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/eba9b28f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/eba9b28f

Branch: refs/heads/master
Commit: eba9b28f34874b1d9a7e467d8a6046dc5357b4d5
Parents: 8677a27
Author: Navina <na...@gmail.com>
Authored: Mon Nov 2 13:59:42 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Mon Nov 2 13:59:42 2015 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  23 +
 .../org/apache/samza/checkpoint/Checkpoint.java |  72 ++++
 .../samza/checkpoint/CheckpointManager.java     |  53 +++
 .../checkpoint/CheckpointManagerFactory.java    |  30 ++
 .../org/apache/samza/checkpoint/Checkpoint.java |  72 ----
 .../samza/checkpoint/CheckpointManager.java     |  93 ----
 .../stream/messages/SetCheckpoint.java          |  74 ----
 .../org/apache/samza/job/model/TaskModel.java   |  19 +-
 .../serializers/model/JsonTaskModelMixIn.java   |   9 +-
 .../samza/checkpoint/CheckpointTool.scala       |  13 +-
 .../apache/samza/checkpoint/OffsetManager.scala |  77 ++--
 .../file/FileSystemCheckpointManager.scala      |  87 ++++
 .../apache/samza/container/SamzaContainer.scala |  18 +-
 .../samza/coordinator/JobCoordinator.scala      |  38 +-
 .../samza/migration/JobRunnerMigration.scala    |  18 +-
 .../MockCoordinatorStreamWrappedConsumer.java   |  16 +-
 .../model/TestSamzaObjectMapper.java            |   9 +-
 .../samza/checkpoint/TestCheckpointTool.scala   |   6 +-
 .../samza/checkpoint/TestOffsetManager.scala    |  16 +-
 .../file/TestFileSystemCheckpointManager.scala  |  86 ++++
 .../samza/container/TestSamzaContainer.scala    |  26 +-
 .../task/TestGroupByContainerCount.scala        |   2 +-
 .../samza/coordinator/TestJobCoordinator.scala  | 104 +----
 .../org/apache/samza/job/TestJobRunner.scala    |   4 +-
 .../apache/samza/job/local/TestProcessJob.scala |   2 +-
 .../old/checkpoint/KafkaCheckpointLogKey.scala  | 188 --------
 .../old/checkpoint/KafkaCheckpointManager.scala | 337 ---------------
 .../KafkaCheckpointManagerFactory.scala         | 108 -----
 .../checkpoint/KafkaCheckpointMigration.scala   |  94 ----
 .../kafka/KafkaCheckpointLogKey.scala           | 194 +++++++++
 .../kafka/KafkaCheckpointManager.scala          | 320 ++++++++++++++
 .../kafka/KafkaCheckpointManagerFactory.scala   | 101 +++++
 .../org/apache/samza/config/KafkaConfig.scala   |  10 +
 .../migration/KafkaCheckpointMigration.scala    | 147 +++++++
 .../scala/org/apache/samza/util/KafkaUtil.scala | 114 ++++-
 .../apache/samza/util/KafkaUtilException.scala  |  31 ++
 .../checkpoint/TestKafkaCheckpointManager.scala | 430 -------------------
 .../kafka/TeskKafkaCheckpointLogKey.scala       |  71 +++
 .../kafka/TestKafkaCheckpointManager.scala      | 244 +++++++++++
 .../TestKafkaCheckpointMigration.scala          | 243 +++++++++++
 .../samza/job/yarn/TestContainerAllocator.java  |   2 +-
 .../yarn/TestHostAwareContainerAllocator.java   |   2 +-
 .../samza/job/yarn/TestSamzaTaskManager.java    |   2 +-
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  |   2 +-
 44 files changed, 1974 insertions(+), 1633 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 4adac09..b5d3813 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1471,6 +1471,29 @@
                   <td class="default">268435456</td>
                   <td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td>
                 </tr>
+
+                <tr>
+                    <th colspan="3" class="section" id="task-migration">
+                        Migrating from Samza 0.9.1 to 0.10.0<br>
+                        <span class="subtitle">
+                            (This section applies if you are upgrading from Samza 0.9.1 to 0.10.0 and have set
+                            <a href="#task-checkpoint-factory" class="property">task.checkpoint.factory</a> to anything <b> other than </b>
+                            <code>org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory</code>)
+                        </span>
+                    </th>
+                </tr>
+
+                <tr>
+                    <td class="property" id="task-checkpoint-skip-migration">task.checkpoint.skip-migration</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        When migrating from 0.9.1 to 0.10.0, the taskName-to-changelog partition mapping was moved from the checkpoint stream to the coordinator stream. <br />
+                        If you are using a checkpoint manager other than kafka
+                        (<code>org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory</code>), you have to
+                        manually migrate taskName-to-changelog partition mapping to the coordinator stream. <br />
+                        This can be achieved with the assistance of the <code>checkpoint-tool.sh</code>.
+                    </td>
+                </tr>
             </tbody>
         </table>
     </body>

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
new file mode 100644
index 0000000..593d118
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+public class Checkpoint {
+  private final Map<SystemStreamPartition, String> offsets;
+
+  /**
+   * Constructs a new checkpoint based off a map of Samza stream offsets.
+   * @param offsets Map of Samza streams to their current offset.
+   */
+  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
+    this.offsets = offsets;
+  }
+
+  /**
+   * Gets a unmodifiable view of the current Samza stream offsets.
+   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+   */
+  public Map<SystemStreamPartition, String> getOffsets() {
+    return Collections.unmodifiableMap(offsets);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof Checkpoint)) return false;
+
+    Checkpoint that = (Checkpoint) o;
+
+    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return offsets != null ? offsets.hashCode() : 0;
+  }
+
+  @Override
+  public String toString() {
+    return "Checkpoint [offsets=" + offsets + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
new file mode 100644
index 0000000..dc14beb
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.container.TaskName;
+
+/**
+ * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some
+ * implementation-specific location.
+ */
+public interface CheckpointManager {
+  void start();
+
+  /**
+   * Registers this manager to write checkpoints of a specific Samza stream partition.
+   * @param taskName Specific Samza taskName of which to write checkpoints for.
+   */
+  void register(TaskName taskName);
+
+  /**
+   * Writes a checkpoint based on the current state of a Samza stream partition.
+   * @param taskName Specific Samza taskName of which to write a checkpoint of.
+   * @param checkpoint Reference to a Checkpoint object to store offset data in.
+   */
+  void writeCheckpoint(TaskName taskName, Checkpoint checkpoint);
+
+  /**
+   * Returns the last recorded checkpoint for a specified taskName.
+   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+   * @return A Checkpoint object with the recorded offset data of the specified partition.
+   */
+  Checkpoint readLastCheckpoint(TaskName taskName);
+
+  void stop();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
new file mode 100644
index 0000000..fe480b5
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Build a {@link org.apache.samza.checkpoint.CheckpointManager}.
+ */
+public interface CheckpointManagerFactory {
+  public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
deleted file mode 100644
index 593d118..0000000
--- a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
-
-  /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
-   */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
-  }
-
-  /**
-   * Gets a unmodifiable view of the current Samza stream offsets.
-   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
-   */
-  public Map<SystemStreamPartition, String> getOffsets() {
-    return Collections.unmodifiableMap(offsets);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof Checkpoint)) return false;
-
-    Checkpoint that = (Checkpoint) o;
-
-    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    return offsets != null ? offsets.hashCode() : 0;
-  }
-
-  @Override
-  public String toString() {
-    return "Checkpoint [offsets=" + offsets + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
deleted file mode 100644
index 0185751..0000000
--- a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.messages.SetCheckpoint;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
-import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The CheckpointManager is used to persist and restore checkpoint information. The CheckpointManager uses
- * CoordinatorStream underneath to do this.
- */
-public class CheckpointManager extends AbstractCoordinatorStreamManager {
-
-  private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
-  private final Map<TaskName, Checkpoint> taskNamesToOffsets;
-  private final HashSet<TaskName> taskNames;
-
-  public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
-      CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
-      String source) {
-    super(coordinatorStreamProducer, coordinatorStreamConsumer, source);
-    taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
-    taskNames = new HashSet<TaskName>();
-  }
-
-  /**
-   * Registers this manager to write checkpoints of a specific Samza stream partition.
-   * @param taskName Specific Samza taskName of which to write checkpoints for.
-   */
-  public void register(TaskName taskName) {
-    log.debug("Adding taskName {} to {}", taskName, this);
-    taskNames.add(taskName);
-    registerCoordinatorStreamConsumer();
-    registerCoordinatorStreamProducer(taskName.getTaskName());
-  }
-
-  /**
-   * Writes a checkpoint based on the current state of a Samza stream partition.
-   * @param taskName Specific Samza taskName of which to write a checkpoint of.
-   * @param checkpoint Reference to a Checkpoint object to store offset data in.
-   */
-  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
-    log.debug("Writing checkpoint for Task: {} with offsets: {}", taskName.getTaskName(), checkpoint.getOffsets());
-    send(new SetCheckpoint(getSource(), taskName.getTaskName(), checkpoint));
-  }
-
-  /**
-   * Returns the last recorded checkpoint for a specified taskName.
-   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
-   * @return A Checkpoint object with the recorded offset data of the specified partition.
-   */
-  public Checkpoint readLastCheckpoint(TaskName taskName) {
-    // Bootstrap each time to make sure that we are caught up with the stream, the bootstrap will just catch up on consecutive calls
-    log.debug("Reading checkpoint for Task: {}", taskName.getTaskName());
-    for (CoordinatorStreamMessage coordinatorStreamMessage : getBootstrappedStream(SetCheckpoint.TYPE)) {
-      SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage);
-      TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey());
-      if (taskNames.contains(taskNameInCheckpoint)) {
-        taskNamesToOffsets.put(taskNameInCheckpoint, setCheckpoint.getCheckpoint());
-        log.debug("Adding checkpoint {} for taskName {}", taskNameInCheckpoint, taskName);
-      }
-    }
-    return taskNamesToOffsets.get(taskName);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
deleted file mode 100644
index 21afa85..0000000
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream.messages;
-
-import org.apache.samza.checkpoint.Checkpoint;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Util;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The SetCheckpoint is used to store the checkpoint messages for a particular task.
- * The structure looks like:
- * {
- * Key: TaskName
- * Type: set-checkpoint
- * Source: ContainerID
- * MessageMap:
- *  {
- *     SSP1 : offset,
- *     SSP2 : offset
- *  }
- * }
- */
-public class SetCheckpoint extends CoordinatorStreamMessage {
-  public static final String TYPE = "set-checkpoint";
-
-  public SetCheckpoint(CoordinatorStreamMessage message) {
-    super(message.getKeyArray(), message.getMessageMap());
-  }
-
-  /**
-   * The SetCheckpoint is used to store checkpoint message for a given task.
-   *
-   * @param source The source writing the checkpoint
-   * @param key The key for the checkpoint message (Typically task name)
-   * @param checkpoint Checkpoint message to be written to the stream
-   */
-  public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
-    super(source);
-    setType(TYPE);
-    setKey(key);
-    Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
-    for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
-      putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
-    }
-  }
-
-  public Checkpoint getCheckpoint() {
-    Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
-    for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
-      offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
-    }
-    return new Checkpoint(offsetMap);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
index e00c49d..59bf2e0 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -20,7 +20,6 @@
 package org.apache.samza.job.model;
 
 import java.util.Collections;
-import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.container.TaskName;
@@ -41,12 +40,12 @@ import org.apache.samza.system.SystemStreamPartition;
  */
 public class TaskModel implements Comparable<TaskModel> {
   private final TaskName taskName;
-  private final Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets;
+  private final Set<SystemStreamPartition> systemStreamPartitions;
   private final Partition changelogPartition;
 
-  public TaskModel(TaskName taskName, Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, Partition changelogPartition) {
+  public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
     this.taskName = taskName;
-    this.systemStreamPartitionsToOffsets = Collections.unmodifiableMap(systemStreamPartitionsToOffsets);
+    this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
     this.changelogPartition = changelogPartition;
   }
 
@@ -55,17 +54,13 @@ public class TaskModel implements Comparable<TaskModel> {
   }
 
   public Set<SystemStreamPartition> getSystemStreamPartitions() {
-    return systemStreamPartitionsToOffsets.keySet();
+    return systemStreamPartitions;
   }
 
   public Partition getChangelogPartition() {
     return changelogPartition;
   }
 
-  public Map<SystemStreamPartition, String> getCheckpointedOffsets() {
-    return systemStreamPartitionsToOffsets;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -80,7 +75,7 @@ public class TaskModel implements Comparable<TaskModel> {
     if (!changelogPartition.equals(taskModel.changelogPartition)) {
       return false;
     }
-    if (!systemStreamPartitionsToOffsets.equals(taskModel.systemStreamPartitionsToOffsets)) {
+    if (!systemStreamPartitions.equals(taskModel.systemStreamPartitions)) {
       return false;
     }
     if (!taskName.equals(taskModel.taskName)) {
@@ -93,7 +88,7 @@ public class TaskModel implements Comparable<TaskModel> {
   @Override
   public int hashCode() {
     int result = taskName.hashCode();
-    result = 31 * result + systemStreamPartitionsToOffsets.hashCode();
+    result = 31 * result + systemStreamPartitions.hashCode();
     result = 31 * result + changelogPartition.hashCode();
     return result;
   }
@@ -101,7 +96,7 @@ public class TaskModel implements Comparable<TaskModel> {
   @Override
 
   public String toString() {
-    return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitionsToOffsets.keySet() + ", changeLogPartition=" + changelogPartition + "]";
+    return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
   }
 
   public int compareTo(TaskModel other) {

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
index 172358a..3ebe391 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -19,7 +19,8 @@
 
 package org.apache.samza.serializers.model;
 
-import java.util.Map;
+import java.util.Set;
+
 import org.apache.samza.Partition;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
@@ -31,14 +32,14 @@ import org.codehaus.jackson.annotate.JsonProperty;
  */
 public abstract class JsonTaskModelMixIn {
   @JsonCreator
-  public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions-offsets") Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, @JsonProperty("changelog-partition") Partition changelogPartition) {
+  public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) {
   }
 
   @JsonProperty("task-name")
   abstract TaskName getTaskName();
 
-  @JsonProperty("system-stream-partitions-offsets")
-  abstract Map<SystemStreamPartition, String> getCheckpointedOffsets();
+  @JsonProperty("system-stream-partitions")
+  abstract Set<SystemStreamPartition> getSystemStreamPartitions();
 
   @JsonProperty("changelog-partition")
   abstract Partition getChangelogPartition();

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 2e3aeb8..31b208f 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -29,10 +29,9 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.CommandLine
+import org.apache.samza.util.{Util, CommandLine, Logging}
 import org.apache.samza.{Partition, SamzaException}
 import scala.collection.JavaConversions._
-import org.apache.samza.util.Logging
 import org.apache.samza.coordinator.JobCoordinator
 
 import scala.collection.immutable.HashMap
@@ -118,10 +117,12 @@ object CheckpointTool {
   }
 
   def apply(config: Config, offsets: TaskNameToCheckpointMap) = {
-    val factory = new CoordinatorStreamSystemFactory
-    val coordinatorStreamConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap())
-    val coordinatorStreamProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap())
-    val manager = new CheckpointManager(coordinatorStreamProducer, coordinatorStreamConsumer, "checkpoint-tool")
+    val manager = config.getCheckpointManagerFactory match {
+      case Some(className) =>
+        Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap)
+      case _ =>
+        throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).")
+    }
     new CheckpointTool(config, offsets, manager)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 1464acc..00648e4 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -72,8 +72,7 @@ object OffsetManager extends Logging {
     config: Config,
     checkpointManager: CheckpointManager = null,
     systemAdmins: Map[String, SystemAdmin] = Map(),
-    offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
-    latestOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) = {
+    offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) = {
     debug("Building offset manager for %s." format systemStreamMetadata)
 
     val offsetSettings = systemStreamMetadata
@@ -99,7 +98,7 @@ object OffsetManager extends Logging {
           // Build OffsetSetting so we can create a map for OffsetManager.
           (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
       }.toMap
-    new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics, latestOffsets)
+    new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics)
   }
 }
 
@@ -142,22 +141,12 @@ class OffsetManager(
   /**
    * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
    */
-  val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
-
-  /*
-   * The previously read checkpoints restored from the coordinator stream
-   */
-  val previousCheckpointedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) extends Logging {
+  val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) extends Logging {
 
   /**
    * Last offsets processed for each SystemStreamPartition.
    */
-  // Filter out null offset values, we can't use them, these exist only because of SSP information
-  var lastProcessedOffsets = previousCheckpointedOffsets.map {
-    case (taskName, sspToOffset) => {
-      taskName -> sspToOffset.filter(_._2 != null)
-    }
-  }
+  var lastProcessedOffsets = Map[TaskName, Map[SystemStreamPartition, String]]()
 
   /**
    * Offsets to start reading from for each SystemStreamPartition. This
@@ -175,13 +164,12 @@ class OffsetManager(
   def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) {
     systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister)
     // register metrics
-    systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach(ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
+    systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach (ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
   }
 
   def start {
     registerCheckpointManager
-    initializeCheckpointManager
-    loadOffsets
+    loadOffsetsFromCheckpointManager
     stripResetStreams
     loadStartingOffsets
     loadDefaults
@@ -269,32 +257,51 @@ class OffsetManager(
     }
   }
 
-  private def initializeCheckpointManager {
+  /**
+   * Loads last processed offsets from checkpoint manager for all registered
+   * partitions.
+   */
+  private def loadOffsetsFromCheckpointManager {
     if (checkpointManager != null) {
+      debug("Loading offsets from checkpoint manager.")
+
       checkpointManager.start
+      val result = systemStreamPartitions
+        .keys
+        .flatMap(restoreOffsetsFromCheckpoint(_))
+        .toMap
+      lastProcessedOffsets ++= result.map {
+        case (taskName, sspToOffset) => {
+          taskName -> sspToOffset.filter {
+            case (systemStreamPartition, offset) =>
+              val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
+              if (!shouldKeep) {
+                info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition))
+              }
+              info("Checkpointed offset is currently %s for %s" format (offset, systemStreamPartition))
+              shouldKeep
+          }
+        }
+      }
     } else {
       debug("Skipping offset load from checkpoint manager because no manager was defined.")
     }
   }
 
   /**
-   * Loads last processed offsets from checkpoint manager for all registered
-   * partitions.
+   * Loads last processed offsets for a single taskName.
    */
-  private def loadOffsets {
-    debug("Loading offsets")
-    lastProcessedOffsets.map {
-      case (taskName, sspToOffsets) => {
-        taskName -> sspToOffsets.filter {
-          case (systemStreamPartition, offset) =>
-            val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
-            if (!shouldKeep) {
-              info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition))
-            }
-            info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
-            shouldKeep
-        }
-      }
+  private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[TaskName, Map[SystemStreamPartition, String]] = {
+    debug("Loading checkpoints for taskName: %s." format taskName)
+
+    val checkpoint = checkpointManager.readLastCheckpoint(taskName)
+
+    if (checkpoint != null) {
+      Map(taskName -> checkpoint.getOffsets.toMap)
+    } else {
+      info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
+
+      Map(taskName -> Map())
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
new file mode 100644
index 0000000..edd0ace
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.file
+
+import java.io.File
+import java.io.FileNotFoundException
+import java.io.FileOutputStream
+import java.util
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.config.Config
+import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.container.TaskName
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.serializers.CheckpointSerde
+import scala.io.Source
+
+class FileSystemCheckpointManager(
+                                   jobName: String,
+                                   root: File,
+                                   serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager {
+
+  override def register(taskName: TaskName):Unit = Unit
+
+  def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints")
+
+  def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
+    val bytes = serde.toBytes(checkpoint)
+    val fos = new FileOutputStream(getCheckpointFile(taskName))
+
+    fos.write(bytes)
+    fos.close
+  }
+
+  def readLastCheckpoint(taskName: TaskName): Checkpoint = {
+    try {
+      val bytes = Source.fromFile(getCheckpointFile(taskName)).map(_.toByte).toArray
+
+      serde.fromBytes(bytes)
+    } catch {
+      case e: FileNotFoundException => null
+    }
+  }
+
+  def start {
+    if (!root.exists) {
+      throw new SamzaException("Root directory for file system checkpoint manager does not exist: %s" format root)
+    }
+  }
+
+  def stop {}
+
+  private def getFile(jobName: String, taskName: TaskName, fileType:String) =
+    new File(root, "%s-%s-%s" format (jobName, taskName, fileType))
+}
+
+class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory {
+  def getCheckpointManager(config: Config, registry: MetricsRegistry) = {
+    val name = config
+      .getName
+      .getOrElse(throw new SamzaException("Missing job name in configs"))
+    val root = config
+      .getFileSystemCheckpointRoot
+      .getOrElse(throw new SamzaException("Missing checkpoint root in configs"))
+    new FileSystemCheckpointManager(name, new File(root))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 0b73403..3787b85 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -21,8 +21,7 @@ package org.apache.samza.container
 
 import java.io.File
 import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{ CheckpointManager, OffsetManager }
-import org.apache.samza.config.Config
+import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
 import org.apache.samza.config.ShellCommandConfig
@@ -61,7 +60,6 @@ import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.config.JobConfig.Config2Job
 import java.lang.Thread.UncaughtExceptionHandler
-import org.apache.samza.checkpoint.OffsetManagerMetrics
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -308,15 +306,17 @@ object SamzaContainer extends Logging {
 
     val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
     val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
-    val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId))
     val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
-
+    val checkpointManager = config.getCheckpointManagerFactory() match {
+      case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) =>
+        Util
+          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
+          .getCheckpointManager(config, samzaContainerMetrics.registry)
+      case _ => null
+    }
     info("Got checkpoint manager: %s" format checkpointManager)
 
-    val combinedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] =
-      containerModel.getTasks.map{case (taskName, taskModel) => taskName -> mapAsScalaMap(taskModel.getCheckpointedOffsets).toMap }.toMap
-
-    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets)
+    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics)
 
     info("Got offset manager: %s" format offsetManager)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 03299cb..ef40c35 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -74,7 +74,6 @@ object JobCoordinator extends Logging {
     coordinatorSystemConsumer.bootstrap
     val config = coordinatorSystemConsumer.getConfig
     info("Got config: %s" format config)
-    val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
     val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
     val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
 
@@ -91,7 +90,7 @@ object JobCoordinator extends Logging {
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
 
-    val jobCoordinator = getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager, localityManager, streamMetadataCache)
+    val jobCoordinator = getJobCoordinator(rewriteConfig(config), changelogManager, localityManager, streamMetadataCache)
     createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache)
 
     jobCoordinator
@@ -103,14 +102,13 @@ object JobCoordinator extends Logging {
    * Build a JobCoordinator using a Samza job's configuration.
    */
   def getJobCoordinator(config: Config,
-                        checkpointManager: CheckpointManager,
                         changelogManager: ChangelogPartitionManager,
                         localityManager: LocalityManager,
                         streamMetadataCache: StreamMetadataCache) = {
-    val jobModelGenerator = initializeJobModel(config, checkpointManager, changelogManager, localityManager, streamMetadataCache)
+    val jobModelGenerator = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache)
     val server = new HttpServer
     server.addServlet("/*", new JobServlet(jobModelGenerator))
-    currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server, checkpointManager)
+    currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server)
     currentJobCoordinator
   }
 
@@ -170,7 +168,6 @@ object JobCoordinator extends Logging {
    * which catchup with the latest content from the coordinator stream.
    */
   private def initializeJobModel(config: Config,
-                                 checkpointManager: CheckpointManager,
                                  changelogManager: ChangelogPartitionManager,
                                  localityManager: LocalityManager,
                                  streamMetadataCache: StreamMetadataCache): () => JobModel = {
@@ -192,10 +189,6 @@ object JobCoordinator extends Logging {
     {
       new util.HashMap[TaskName, Integer]()
     }
-
-    checkpointManager.start()
-    groups.foreach(taskSSP => checkpointManager.register(taskSSP._1))
-
     // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
     // TODO: This code will go away with refactoring - SAMZA-678
 
@@ -204,7 +197,6 @@ object JobCoordinator extends Logging {
     // Generate the jobModel
     def jobModelGenerator(): JobModel = refreshJobModel(config,
                                                         allSystemStreamPartitions,
-                                                        checkpointManager,
                                                         groups,
                                                         previousChangelogMapping,
                                                         localityManager)
@@ -238,7 +230,6 @@ object JobCoordinator extends Logging {
    */
   private def refreshJobModel(config: Config,
                               allSystemStreamPartitions: util.Set[SystemStreamPartition],
-                              checkpointManager: CheckpointManager,
                               groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
                               previousChangelogMapping: util.Map[TaskName, Integer],
                               localityManager: LocalityManager): JobModel = {
@@ -253,17 +244,6 @@ object JobCoordinator extends Logging {
       {
         groups.map
                 { case (taskName, systemStreamPartitions) =>
-                  val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
-                  // Find the system partitions which don't have a checkpoint and set null for the values for offsets
-                  val taskOffsets = checkpoint.getOffsets
-                  val offsetMap = new util.HashMap[SystemStreamPartition, String]()
-                  systemStreamPartitions.foreach {
-                    ssp =>
-                      if(taskOffsets.containsKey(ssp))
-                        offsetMap.put(ssp, taskOffsets.get(ssp))
-                      else
-                        offsetMap.put(ssp, null)
-                  }
                   val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
                   {
                     case Some(changelogPartitionId) => new Partition(changelogPartitionId)
@@ -274,7 +254,7 @@ object JobCoordinator extends Logging {
                       info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
                       new Partition(maxChangelogPartitionId)
                   }
-                  new TaskModel(taskName, offsetMap, changelogPartition)
+                  new TaskModel(taskName, systemStreamPartitions, changelogPartition)
                 }.toSet
       }
 
@@ -336,12 +316,7 @@ class JobCoordinator(
   /**
    * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
    */
-  val server: HttpServer = null,
-
-  /**
-   * Handle to checkpoint manager that's used to refresh the JobModel
-   */
-  val checkpointManager: CheckpointManager) extends Logging {
+  val server: HttpServer = null) extends Logging {
 
   debug("Got job model: %s." format jobModel)
 
@@ -358,9 +333,6 @@ class JobCoordinator(
       debug("Stopping HTTP server.")
       server.stop
       info("Stopped HTTP server.")
-      debug("Stopping checkpoint manager.")
-      checkpointManager.stop()
-      info("Stopped checkpoint manager.")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
index 374e27e..f38b87a 100644
--- a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
+++ b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
@@ -24,7 +24,10 @@ import org.apache.samza.SamzaException
 
 
 object JobRunnerMigration {
-  val CHECKPOINTMIGRATION = "old.checkpoint.KafkaCheckpointMigration"
+  val CHECKPOINT_MIGRATION = "org.apache.samza.migration.KafkaCheckpointMigration"
+  val UNSUPPORTED_ERROR_MSG = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
+    "for everything else, please use the checkpoint tool to migrate the taskname-to-changelog mapping, and add " +
+    "task.checkpoint.skip-migration=true to your configs."
   def apply(config: Config) = {
     val migration = new JobRunnerMigration
     migration.checkpointMigration(config)
@@ -38,15 +41,18 @@ class JobRunnerMigration extends Logging {
     checkpointFactory match {
       case Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") =>
         info("Performing checkpoint migration")
-        val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINTMIGRATION)
+        val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINT_MIGRATION)
         checkpointMigrationPlan.migrate(config)
       case None =>
         info("No task.checkpoint.factory defined, not performing any checkpoint migration")
       case _ =>
-        val errorMsg = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
-          "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration"
-        error(errorMsg)
-        throw new SamzaException(errorMsg)
+        val skipMigration = config.getBoolean("task.checkpoint.skip-migration", false)
+        if (skipMigration) {
+          info("Job is configured to skip any checkpoint migration.")
+        } else {
+          error(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
+          throw new SamzaException(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
+        }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index dd04d28..429573b 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -20,22 +20,18 @@
 package org.apache.samza.coordinator.stream;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import org.apache.samza.SamzaException;
-import org.apache.samza.checkpoint.Checkpoint;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
-import org.apache.samza.coordinator.stream.messages.SetCheckpoint;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.BlockingEnvelopeMap;
-import org.apache.samza.util.Util;
 import org.codehaus.jackson.map.ObjectMapper;
 
 /**
@@ -47,7 +43,6 @@ import org.codehaus.jackson.map.ObjectMapper;
 public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
   private final static ObjectMapper MAPPER = SamzaObjectMapper.getObjectMapper();
   public final static String CHANGELOGPREFIX = "ch:";
-  public final static String CHECKPOINTPREFIX = "cp:";
   public final CountDownLatch blockConsumerPoll = new CountDownLatch(1);
   public boolean blockpollFlag = false;
 
@@ -78,16 +73,7 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
       for (Map.Entry<String, String> configPair : config.entrySet()) {
         byte[] keyBytes = null;
         byte[] messgeBytes = null;
-        if (configPair.getKey().startsWith(CHECKPOINTPREFIX)) {
-          String[] checkpointInfo = configPair.getKey().split(":");
-          String[] sspOffsetPair = configPair.getValue().split(":");
-          HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>();
-          checkpointMap.put(Util.stringToSsp(sspOffsetPair[0]), sspOffsetPair[1]);
-          Checkpoint cp = new Checkpoint(checkpointMap);
-          SetCheckpoint setCheckpoint = new SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
-          keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8");
-          messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8");
-        } else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
+        if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
           String[] changelogInfo = configPair.getKey().split(":");
           String changeLogPartition = configPair.getValue();
           SetChangelogMapping changelogMapping = new SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index ad1fbc5..2c64598 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -21,7 +21,10 @@ package org.apache.samza.serializers.model;
 
 import static org.junit.Assert.assertEquals;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -38,12 +41,12 @@ public class TestSamzaObjectMapper {
   public void testJsonTaskModel() throws Exception {
     ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
     Map<String, String> configMap = new HashMap<String, String>();
-    Map<SystemStreamPartition, String> sspOffset = new HashMap<SystemStreamPartition, String>();
+    Set<SystemStreamPartition> ssp = new HashSet<>();
     configMap.put("a", "b");
     Config config = new MapConfig(configMap);
     TaskName taskName = new TaskName("test");
-    sspOffset.put(new SystemStreamPartition("foo", "bar", new Partition(1)), "");
-    TaskModel taskModel = new TaskModel(taskName, sspOffset, new Partition(2));
+    ssp.add(new SystemStreamPartition("foo", "bar", new Partition(1)));
+    TaskModel taskModel = new TaskModel(taskName, ssp, new Partition(2));
     Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
     tasks.put(taskName, taskModel);
     ContainerModel containerModel = new ContainerModel(1, tasks);

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 00b8977..0865b31 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -41,7 +41,7 @@ object TestCheckpointTool {
   var systemProducer: SystemProducer = null
   var systemAdmin: SystemAdmin = null
 
-  class MockCheckpointManagerFactory {
+  class MockCheckpointManagerFactory extends CheckpointManagerFactory {
     def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
   }
 
@@ -87,7 +87,7 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
 
   @Test
   def testReadLatestCheckpoint {
-    val checkpointTool = new CheckpointTool(config, null, TestCheckpointTool.checkpointManager)
+    val checkpointTool = CheckpointTool(config, null)
     checkpointTool.run
     verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0)
     verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1)
@@ -99,7 +99,7 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
     val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
       tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
 
-    val checkpointTool = new CheckpointTool(config, toOverwrite, TestCheckpointTool.checkpointManager)
+    val checkpointTool = CheckpointTool(config, toOverwrite)
     checkpointTool.run
     verify(TestCheckpointTool.checkpointManager)
       .writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42")))

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index c00ef91..75ba8af 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -65,7 +65,7 @@ class TestOffsetManager {
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
@@ -97,7 +97,7 @@ class TestOffsetManager {
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
@@ -242,17 +242,17 @@ class TestOffsetManager {
 
   private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
     val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
-    new CheckpointManager(null, null, null) {
+    new CheckpointManager {
       var isStarted = false
       var isStopped = false
       var registered = Set[TaskName]()
       var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint)
       var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
-      override def start { isStarted = true }
-      override def register(taskName: TaskName) { registered += taskName }
-      override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
-      override def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
-      override def stop { isStopped = true }
+      def start { isStarted = true }
+      def register(taskName: TaskName) { registered += taskName }
+      def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
+      def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
+      def stop { isStopped = true }
 
       // Only for testing purposes - not present in actual checkpoint manager
       def getOffets = Map(taskName -> mapAsScalaMap(checkpoint.getOffsets()).toMap)

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
new file mode 100644
index 0000000..4ca738e
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.file
+
+import java.io.File
+import scala.collection.JavaConversions._
+import java.util.Random
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.apache.samza.SamzaException
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.container.TaskName
+import org.junit.rules.TemporaryFolder
+
+class TestFileSystemCheckpointManager  {
+  val checkpointRoot = System.getProperty("java.io.tmpdir") // TODO: Move this out of tmp, into our build dir
+  val taskName = new TaskName("Warwickshire")
+  val baseFileLocation = new File(checkpointRoot)
+
+  val tempFolder = new TemporaryFolder
+
+  @Before
+  def createTempFolder = tempFolder.create()
+
+  @After
+  def deleteTempFolder = tempFolder.delete()
+
+  @Test
+  def testReadForCheckpointFileThatDoesNotExistShouldReturnNull {
+    val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot)
+    assertNull(cpm.readLastCheckpoint(taskName))
+  }
+
+  @Test
+  def testReadForCheckpointFileThatDoesExistShouldReturnProperCheckpoint {
+    val cp = new Checkpoint(Map(
+      new SystemStreamPartition("a", "b", new Partition(0)) -> "c",
+      new SystemStreamPartition("a", "c", new Partition(1)) -> "d",
+      new SystemStreamPartition("b", "d", new Partition(2)) -> "e"))
+
+    var readCp:Checkpoint = null
+    val cpm =  new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot)
+
+    cpm.start
+    cpm.writeCheckpoint(taskName, cp)
+    readCp = cpm.readLastCheckpoint(taskName)
+    cpm.stop
+
+    assertNotNull(readCp)
+    cp.equals(readCp)
+    assertEquals(cp.getOffsets.keySet(), readCp.getOffsets.keySet())
+    assertEquals(cp.getOffsets, readCp.getOffsets)
+    assertEquals(cp, readCp)
+  }
+
+  @Test
+  def testMissingRootDirectoryShouldFailOnManagerCreation {
+    val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot + new Random().nextInt))
+    try {
+      cpm.start
+      fail("Expected an exception since root directory for fs checkpoint manager doesn't exist.")
+    } catch {
+      case e: SamzaException => None // this is expected
+    }
+    cpm.stop
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a77ddc7..d91b1da 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -54,7 +54,7 @@ import org.scalatest.junit.AssertionsForJUnit
 import java.lang.Thread.UncaughtExceptionHandler
 import org.apache.samza.serializers._
 import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 
 class TestSamzaContainer extends AssertionsForJUnit {
   @Test
@@ -63,15 +63,15 @@ class TestSamzaContainer extends AssertionsForJUnit {
     val offsets = new util.HashMap[SystemStreamPartition, String]()
     offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1")
     val tasks = Map(
-      new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets, new Partition(0)),
-      new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets, new Partition(0)))
+      new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
+      new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
     val containers = Map(
       Integer.valueOf(0) -> new ContainerModel(0, tasks),
       Integer.valueOf(1) -> new ContainerModel(1, tasks))
     val jobModel = new JobModel(config, containers)
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
-    val coordinator = new JobCoordinator(jobModel, server, new MockCheckpointManager)
+    val coordinator = new JobCoordinator(jobModel, server)
     coordinator.server.addServlet("/*", new JobServlet(jobModelGenerator))
     try {
       coordinator.start
@@ -87,12 +87,12 @@ class TestSamzaContainer extends AssertionsForJUnit {
     val offsets = new util.HashMap[SystemStreamPartition, String]()
     offsets.put(new SystemStreamPartition("system", "stream", new Partition(0)), "1")
     val tasksForContainer1 = Map(
-      new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets, new Partition(0)),
-      new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets, new Partition(1)))
+      new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
+      new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(1)))
     val tasksForContainer2 = Map(
-      new TaskName("t3") -> new TaskModel(new TaskName("t3"), offsets, new Partition(2)),
-      new TaskName("t4") -> new TaskModel(new TaskName("t4"), offsets, new Partition(3)),
-      new TaskName("t5") -> new TaskModel(new TaskName("t6"), offsets, new Partition(4)))
+      new TaskName("t3") -> new TaskModel(new TaskName("t3"), offsets.keySet(), new Partition(2)),
+      new TaskName("t4") -> new TaskModel(new TaskName("t4"), offsets.keySet(), new Partition(3)),
+      new TaskName("t5") -> new TaskModel(new TaskName("t6"), offsets.keySet(), new Partition(4)))
     val containerModel1 = new ContainerModel(0, tasksForContainer1)
     val containerModel2 = new ContainerModel(1, tasksForContainer2)
     val containers = Map(
@@ -204,7 +204,13 @@ class TestSamzaContainer extends AssertionsForJUnit {
   }
 }
 
-class MockCheckpointManager extends CheckpointManager(null, null, "Unknown") {
+class MockCheckpointManager extends CheckpointManager {
   override def start() = {}
   override def stop() = {}
+
+  override def register(taskName: TaskName): Unit = {}
+
+  override def readLastCheckpoint(taskName: TaskName): Checkpoint = { new Checkpoint(Map[SystemStreamPartition, String]()) }
+
+  override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint): Unit = { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
index ddf1fde..6e9c6fa 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
@@ -71,6 +71,6 @@ class TestGroupByContainerCount {
   }
 
   private def getTaskModel(name: String, partitionId: Int) = {
-    new TaskModel(new TaskName(name), Map[SystemStreamPartition, String](), new Partition(partitionId))
+    new TaskModel(new TaskName(name), Set[SystemStreamPartition](), new Partition(partitionId))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 1393da8..80cccf3 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -19,19 +19,14 @@
 
 package org.apache.samza.coordinator
 
-
-import java.net.SocketTimeoutException
-
-import org.apache.samza.util.Util
 import org.junit.{After, Test}
 import org.junit.Assert._
-import org.junit.rules.ExpectedException
 import scala.collection.JavaConversions._
 import org.apache.samza.config.MapConfig
 import org.apache.samza.config.TaskConfig
 import org.apache.samza.config.SystemConfig
 import org.apache.samza.container.{SamzaContainer, TaskName}
-import org.apache.samza.metrics.{MetricsRegistryMap, MetricsRegistry}
+import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.Config
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.SystemAdmin
@@ -67,20 +62,16 @@ class TestJobCoordinator {
     // Construct the expected JobModel, so we can compare it to
     // JobCoordinator's JobModel.
     val container0Tasks = Map(
-      task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)),
-      task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5)))
+      task0Name -> new TaskModel(task0Name, checkpoint0.keySet, new Partition(4)),
+      task2Name -> new TaskModel(task2Name, checkpoint2.keySet, new Partition(5)))
     val container1Tasks = Map(
-      task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3)))
+      task1Name -> new TaskModel(task1Name, checkpoint1.keySet, new Partition(3)))
     val containers = Map(
       Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
       Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
 
 
     // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
-    val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
-            task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next())
-    val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
-            task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next())
     val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
     val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3"
     val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5"
@@ -88,8 +79,6 @@ class TestJobCoordinator {
     // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
     // SetCheckpoint and SetChangelog
     val otherConfigs = Map(
-      checkpointOffset0,
-      checkpointOffset1,
       changelogInfo0,
       changelogInfo1,
       changelogInfo2
@@ -116,43 +105,32 @@ class TestJobCoordinator {
   }
 
   @Test
-  def testJobCoordinatorCheckpointing = {
+  def testJobCoordinatorChangelogPartitionMapping = {
     System.out.println("test  ")
     val task0Name = new TaskName("Partition 0")
-    val checkpoint0 = Map(new SystemStreamPartition("test", "stream1", new Partition(0)) -> "4")
+    val ssp0 = Set(new SystemStreamPartition("test", "stream1", new Partition(0)))
     val task1Name = new TaskName("Partition 1")
-    val checkpoint1 = Map(new SystemStreamPartition("test", "stream1", new Partition(1)) ->"3")
+    val ssp1 = Set(new SystemStreamPartition("test", "stream1", new Partition(1)))
     val task2Name = new TaskName("Partition 2")
-    val checkpoint2 = Map(new SystemStreamPartition("test", "stream1", new Partition(2)) -> "8")
+    val ssp2 = Set(new SystemStreamPartition("test", "stream1", new Partition(2)))
 
     // Construct the expected JobModel, so we can compare it to
     // JobCoordinator's JobModel.
     val container0Tasks = Map(
-      task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)),
-      task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5)))
+      task0Name -> new TaskModel(task0Name, ssp0, new Partition(4)),
+      task2Name -> new TaskModel(task2Name, ssp1, new Partition(5)))
     val container1Tasks = Map(
-      task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3)))
+      task1Name -> new TaskModel(task1Name, ssp1, new Partition(3)))
     val containers = Map(
       Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
       Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
 
-
-    // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
-    val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
-            task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next())
-    val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
-            task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next())
-    val checkpointOffset2 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
-            task2Name.getTaskName() -> (Util.sspToString(checkpoint2.keySet.iterator.next()) + ":" + checkpoint2.values.iterator.next())
     val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
-    val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3"
-    val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5"
 
     // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
     // SetCheckpoint and SetChangelog
     // Write a couple of checkpoints that the job coordinator will process
     val otherConfigs = Map(
-      checkpointOffset0,
       changelogInfo0
     )
 
@@ -175,62 +153,22 @@ class TestJobCoordinator {
     val url = coordinator.server.getUrl.toString
 
     // Verify if the jobCoordinator has seen the checkpoints
-    var offsets = extractOffsetsFromJobCoordinator(url)
-    assertEquals(1, offsets.size)
-    assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
-
-    // Write more checkpoints
-    val wrappedConsumer = new MockCoordinatorStreamSystemFactory()
-            .getConsumer(null, null, null)
-            .asInstanceOf[MockCoordinatorStreamWrappedConsumer]
-
-    var moreMessageConfigs = Map(
-      checkpointOffset1
-    )
-
-    wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs))
-
-    // Verify if the coordinator has seen it
-    offsets = extractOffsetsFromJobCoordinator(url)
-    assertEquals(2, offsets.size)
-    assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
-    assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail()))
-
-    // Write more checkpoints but block on read on the mock consumer
-    moreMessageConfigs = Map(
-      checkpointOffset2
-    )
-
-    wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs))
-
-    // Simulate consumer being blocked (Job coordinator waiting to read new checkpoints from coordinator after container failure)
-    val latch = wrappedConsumer.blockPool();
-
-    // verify if the port times out
-    var seenException = false
-    try {
-      extractOffsetsFromJobCoordinator(url)
-    }
-    catch {
-      case se: SocketTimeoutException => seenException = true
-    }
-    assertTrue(seenException)
+    val changelogPartitionMapping = extractChangelogPartitionMapping(url)
+    assertEquals(3, changelogPartitionMapping.size)
+    val expectedChangelogPartitionMapping = Map(task0Name -> 4, task1Name -> 5, task2Name -> 6)
+    assertEquals(expectedChangelogPartitionMapping.get(task0Name), changelogPartitionMapping.get(task0Name))
+    assertEquals(expectedChangelogPartitionMapping.get(task1Name), changelogPartitionMapping.get(task1Name))
+    assertEquals(expectedChangelogPartitionMapping.get(task2Name), changelogPartitionMapping.get(task2Name))
 
-    // verify if it has read the new checkpoints after job coordinator has loaded the new checkpoints
-    latch.countDown()
-    offsets = extractOffsetsFromJobCoordinator(url)
-    assertEquals(offsets.size, 3)
-    assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
-    assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail()))
-    assertEquals(checkpoint2.head._2, offsets.getOrElse(checkpoint2.head._1, fail()))
     coordinator.stop
   }
 
-  def extractOffsetsFromJobCoordinator(url : String) = {
+  def extractChangelogPartitionMapping(url : String) = {
     val jobModel = SamzaContainer.readJobModel(url.toString)
     val taskModels = jobModel.getContainers.values().flatMap(_.getTasks.values())
-    val offsets = taskModels.flatMap(_.getCheckpointedOffsets).toMap
-    offsets.filter(_._2 != null)
+    taskModels.map{taskModel => {
+      taskModel.getTaskName -> taskModel.getChangelogPartition.getPartitionId
+    }}.toMap
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index 9036e81..e97656a 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -24,6 +24,7 @@ import java.io.File
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.migration.JobRunnerMigration
 import org.junit.Test
 import org.junit.After
 import org.junit.Assert._
@@ -51,8 +52,7 @@ class TestJobRunner {
         "file://%s/src/test/resources/test-migration-fail.properties" format new File(".").getCanonicalPath))
       fail("Should have failed already.")
     } catch {
-      case se: SamzaException => assertEquals(se.getMessage, "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
-        "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration")
+      case se: SamzaException => assertEquals(se.getMessage, JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index a1efe6f..3a710a8 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -56,7 +56,7 @@ class TestProcessJob {
   }
 }
 
-class MockJobCoordinator extends JobCoordinator(null, null, null) {
+class MockJobCoordinator extends JobCoordinator(null, null) {
   var stopped: Boolean = false
 
   override def start: Unit = { }