You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/09/24 23:37:41 UTC

samza git commit: SAMZA-615: Auto-migrate Kafka checkpoints into coordinator stream

Repository: samza
Updated Branches:
  refs/heads/master 9ab00c188 -> 841839de0


SAMZA-615: Auto-migrate Kafka checkpoints into coordinator stream

Squashed commit of the following:

commit ea64b4e9da0c68849d79353121d2577d27feb709
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Date:   Tue Sep 22 14:25:52 2015 -0700

    SAMZA-615: Fix unit test in TestJobRunner since there is no guaranteed execution ordering between two tests

commit 3d9d897c152818993c426b00ceed755690baf5d1
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Date:   Tue Sep 22 11:11:40 2015 -0700

    SAMZA-615: Merged w/ SAMZA-731 and removed all write operations in KafkaCheckpointManager

commit 4d35dc84614374f6efdb911cb9ca7ce18b1f2c76
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Date:   Wed Sep 16 13:37:11 2015 -0700

    SAMZA-615: Add auto-checkpoint migration for Kafka checkpoints


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/841839de
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/841839de
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/841839de

Branch: refs/heads/master
Commit: 841839de0031f4dc743e1a6f1e795dc18cc13fbc
Parents: 9ab00c1
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Authored: Thu Sep 24 14:36:23 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Thu Sep 24 14:36:46 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   2 +
 checkstyle/import-control.xml                   |   4 +
 .../stream/CoordinatorStreamSystemConsumer.java |   1 +
 .../stream/CoordinatorStreamSystemProducer.java |   1 +
 .../messages/SetMigrationMetaMessage.java       |  51 +++
 .../apache/samza/migration/MigrationPlan.java   |  27 ++
 .../scala/org/apache/samza/job/JobRunner.scala  |   4 +
 .../samza/migration/JobRunnerMigration.scala    |  52 +++
 .../MockCoordinatorStreamSystemFactory.java     |  17 +-
 .../MockCoordinatorStreamWrappedConsumer.java   |   5 +
 .../samza/storage/TestStorageRecovery.java      |   8 +
 .../resources/test-migration-fail.properties    |  26 ++
 .../org/apache/samza/job/TestJobRunner.scala    |  28 ++
 .../old/checkpoint/KafkaCheckpointLogKey.scala  | 188 ++++++++
 .../old/checkpoint/KafkaCheckpointManager.scala | 337 +++++++++++++++
 .../KafkaCheckpointManagerFactory.scala         | 108 +++++
 .../checkpoint/KafkaCheckpointMigration.scala   |  95 ++++
 .../system/kafka/KafkaSystemProducer.scala      |   1 +
 .../checkpoint/TestKafkaCheckpointManager.scala | 430 +++++++++++++++++++
 19 files changed, 1383 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 3a7fabc..0c38a76 100644
--- a/build.gradle
+++ b/build.gradle
@@ -134,6 +134,7 @@ project(":samza-core_$scalaVersion") {
 
   // Force scala joint compilation
   sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.test.scala.srcDir "src/test/java"
   sourceSets.main.java.srcDirs = []
 
   jar {
@@ -233,6 +234,7 @@ project(":samza-kafka_$scalaVersion") {
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
+    testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
 
     // Logging in tests is good.
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index bc07ae8..53cb8b4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -52,6 +52,10 @@
         </subpackage>
     </subpackage>
 
+    <subpackage name="migration">
+        <allow pkg="org.apache.samza.config" />
+    </subpackage>
+
     <subpackage name="job">
         <allow pkg="org.apache.samza.config" />
         <allow pkg="org.apache.samza.coordinator.stream" />

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 79291af..3113f09 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -127,6 +127,7 @@ public class CoordinatorStreamSystemConsumer {
   public void stop() {
     log.info("Stopping coordinator stream system consumer.");
     systemConsumer.stop();
+    isStarted = false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index 42ae00b..36cf759 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -91,6 +91,7 @@ public class CoordinatorStreamSystemProducer {
   public void stop() {
     log.info("Stopping coordinator stream producer.");
     systemProducer.stop();
+    isStarted = false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetMigrationMetaMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetMigrationMetaMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetMigrationMetaMessage.java
new file mode 100644
index 0000000..70fbcb5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetMigrationMetaMessage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+/**
+ * The Set is used to store the migrations that have been performed
+ * The structure looks like:
+ * {
+ * Key: migration-info
+ * Type: set-migration-info
+ * Source: ContainerID
+ * MessageMap:
+ *  {
+ *     "0910checkpointmigration" : true
+ *  }
+ * }
+ */
+public class SetMigrationMetaMessage extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-migration-info";
+
+  public SetMigrationMetaMessage(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  public SetMigrationMetaMessage(String source, String metaInfoKey, String metaInfoVal) {
+    super(source);
+    setType(TYPE);
+    setKey("migration-info");
+    putMessageValue(metaInfoKey, metaInfoVal);
+  }
+
+  public String getMetaInfo(String metaInfoKey) {
+    return getMessageValues().get(metaInfoKey);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/java/org/apache/samza/migration/MigrationPlan.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/migration/MigrationPlan.java b/samza-core/src/main/java/org/apache/samza/migration/MigrationPlan.java
new file mode 100644
index 0000000..c6ff833
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/migration/MigrationPlan.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.migration;
+
+import org.apache.samza.config.Config;
+
+
+public interface MigrationPlan {
+  void migrate(Config config);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 1673217..d6109ec 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -24,6 +24,7 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.migration.JobRunnerMigration
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
@@ -98,6 +99,9 @@ class JobRunner(config: Config) extends Logging {
     }
     coordinatorSystemProducer.stop
 
+    // Perform any migration plan to run in job runner
+    JobRunnerMigration(config)
+
     // Create the actual job, and submit it.
     val job = jobFactory.getJob(config).submit
 

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
new file mode 100644
index 0000000..374e27e
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.migration
+
+import org.apache.samza.config.Config
+import org.apache.samza.util.{Util, Logging}
+import org.apache.samza.SamzaException
+
+
+object JobRunnerMigration {
+  val CHECKPOINTMIGRATION = "old.checkpoint.KafkaCheckpointMigration"
+  def apply(config: Config) = {
+    val migration = new JobRunnerMigration
+    migration.checkpointMigration(config)
+  }
+}
+
+class JobRunnerMigration extends Logging {
+
+  def checkpointMigration(config: Config) = {
+    val checkpointFactory = Option(config.get("task.checkpoint.factory"))
+    checkpointFactory match {
+      case Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") =>
+        info("Performing checkpoint migration")
+        val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINTMIGRATION)
+        checkpointMigrationPlan.migrate(config)
+      case None =>
+        info("No task.checkpoint.factory defined, not performing any checkpoint migration")
+      case _ =>
+        val errorMsg = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
+          "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration"
+        error(errorMsg)
+        throw new SamzaException(errorMsg)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 84ae0b5..9d8c98e 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -29,9 +29,11 @@ import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
 import org.apache.samza.util.Util;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -115,7 +117,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
 
     public MockSystemProducer(String expectedSource) {
       this.expectedSource = expectedSource;
-      this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+      this.envelopes = new ArrayList<>();
     }
 
 
@@ -132,7 +134,18 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
     }
 
     public void send(String source, OutgoingMessageEnvelope envelope) {
-      envelopes.add(envelope);
+      if (mockConsumer != null) {
+        MockCoordinatorStreamWrappedConsumer consumer = (MockCoordinatorStreamWrappedConsumer) mockConsumer;
+        SystemStreamPartition ssp = new SystemStreamPartition(envelope.getSystemStream(), new Partition(0));
+        consumer.register(ssp, "");
+        try {
+          consumer.addMessageEnvelope(new IncomingMessageEnvelope(ssp, "", envelope.getKey(), envelope.getMessage()));
+        } catch (IOException | InterruptedException e) {
+          e.printStackTrace();
+        }
+      } else {
+        envelopes.add(envelope);
+      }
     }
 
     public void flush(String source) {

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index 47a44b1..dd04d28 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.coordinator.stream;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +68,10 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
     convertConfigToCoordinatorMessage(config);
   }
 
+  public void addMessageEnvelope(IncomingMessageEnvelope envelope) throws IOException, InterruptedException {
+    put(systemStreamPartition, envelope);
+    setIsAtHead(systemStreamPartition, true);
+  }
 
   private void convertConfigToCoordinatorMessage(Config config) {
     try {

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
index 52d450d..53207ad 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
@@ -35,6 +35,7 @@ import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.SystemStreamPartition;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -70,8 +71,15 @@ public class TestStorageRecovery {
     when(systemAdmin.getSystemStreamMetadata(set2)).thenReturn(ssmMap);
   }
 
+  @After
+  public void teardown() {
+    MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
+  }
+
   @Test
   public void testStorageEngineReceivedAllValues() {
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+
     String path = "/tmp/testing";
     StorageRecovery storageRecovery = new StorageRecovery(config, path);
     storageRecovery.run();

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/resources/test-migration-fail.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test-migration-fail.properties b/samza-core/src/test/resources/test-migration-fail.properties
new file mode 100644
index 0000000..b0657de
--- /dev/null
+++ b/samza-core/src/test/resources/test-migration-fail.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+job.factory.class=org.apache.samza.job.MockJobFactory
+job.name=test-job
+foo=bar
+systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index 52057ed..9036e81 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -21,8 +21,11 @@ package org.apache.samza.job
 
 import java.io.File
 
+import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 import org.junit.Test
+import org.junit.After
 import org.junit.Assert._
 
 object TestJobRunner {
@@ -30,8 +33,33 @@ object TestJobRunner {
 }
 
 class TestJobRunner {
+
+  @After
+  def teardown {
+    MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
+  }
+
+  @Test
+  def testJobRunnerMigrationFails {
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+    try {
+      JobRunner.main(Array(
+        "--config-factory",
+        "org.apache.samza.config.factories.PropertiesConfigFactory",
+        "--config-path",
+        "file://%s/src/test/resources/test-migration-fail.properties" format new File(".").getCanonicalPath))
+      fail("Should have failed already.")
+    } catch {
+      case se: SamzaException => assertEquals(se.getMessage, "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
+        "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration")
+    }
+  }
+
   @Test
   def testJobRunnerWorks {
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
     JobRunner.main(Array(
       "--config-factory",
       "org.apache.samza.config.factories.PropertiesConfigFactory",

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
new file mode 100644
index 0000000..958d07c
--- /dev/null
+++ b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package old.checkpoint
+
+import java.util
+
+import org.apache.samza.SamzaException
+import org.apache.samza.container.TaskName
+import org.codehaus.jackson.`type`.TypeReference
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.JavaConversions._
+
+/**
+ * Kafka Checkpoint Log-specific key used to identify what type of entry is
+ * written for any particular log entry.
+ *
+ * @param map Backing map to hold key values
+ */
+class KafkaCheckpointLogKey private (val map: Map[String, String]) {
+  // This might be better as a case class...
+  import KafkaCheckpointLogKey._
+
+  /**
+   * Serialize this key to bytes
+   * @return Key as bytes
+   */
+  def toBytes(): Array[Byte] = {
+    val jMap = new util.HashMap[String, String](map.size)
+    jMap.putAll(map)
+
+    JSON_MAPPER.writeValueAsBytes(jMap)
+  }
+
+  private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY  + " in map for Kafka Checkpoint log key"))
+
+  /**
+   * Is this key for a checkpoint entry?
+   *
+   * @return true iff this key's entry is for a checkpoint
+   */
+  def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
+
+  /**
+   * Is this key for a changelog partition mapping?
+   *
+   * @return true iff this key's entry is for a changelog partition mapping
+   */
+  def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
+
+  /**
+   * If this Key is for a checkpoint entry, return its associated TaskName.
+   *
+   * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
+   */
+  def getCheckpointTaskName = {
+    val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
+    new TaskName(asString)
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: KafkaCheckpointLogKey =>
+      (that canEqual this) &&
+        map == that.map
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val state = Seq(map)
+    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
+}
+
+object KafkaCheckpointLogKey {
+  /**
+   *  Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
+   *  type, either a checkpoint or a changelog-partition-mapping.
+   */
+  val CHECKPOINT_KEY_KEY = "type"
+  val CHECKPOINT_KEY_TYPE = "checkpoint"
+  val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
+  val CHECKPOINT_TASKNAME_KEY = "taskName"
+  val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
+
+  /**
+   * Partition mapping keys have no dynamic values, so we just need one instance.
+   */
+  val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
+
+  private val JSON_MAPPER = new ObjectMapper()
+  val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
+
+  var systemStreamPartitionGrouperFactoryString:Option[String] = None
+
+  /**
+   * Set the name of the factory configured to provide the SystemStreamPartition grouping
+   * so it be included in the key.
+   *
+   * @param str Config value of SystemStreamPartition Grouper Factory
+   */
+  def setSystemStreamPartitionGrouperFactoryString(str:String) = {
+    systemStreamPartitionGrouperFactoryString = Some(str)
+  }
+
+  /**
+   * Get the name of the factory configured to provide the SystemStreamPartition grouping
+   * so it be included in the key
+   */
+  def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
+
+  /**
+   * Build a key for a a checkpoint log entry for a particular TaskName
+   * @param taskName TaskName to build for this checkpoint entry
+   *
+   * @return Key for checkpoint log entry
+   */
+  def getCheckpointKey(taskName:TaskName) = {
+    val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
+      CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
+      SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
+
+    new KafkaCheckpointLogKey(map)
+  }
+
+  /**
+   * Build a key for a changelog partition mapping entry
+   *
+   * @return Key for changelog partition mapping entry
+   */
+  def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
+
+  /**
+   * Deserialize a Kafka checkpoint log key
+   * @param bytes Serialized (via JSON) Kafka checkpoint log key
+   * @return Checkpoint log key
+   */
+  def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
+    try {
+      val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
+
+      if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
+        throw new SamzaException("No type entry in checkpoint key: " + jmap)
+      }
+
+      // Only checkpoint keys have ssp grouper factory keys
+      if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
+        val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
+
+        if (sspGrouperFactory == null) {
+          throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
+        }
+
+        if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
+          throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
+        }
+      }
+
+      new KafkaCheckpointLogKey(jmap.toMap)
+    } catch {
+      case e: Exception =>
+        throw new SamzaException("Exception while deserializing checkpoint key", e)
+    }
+  }
+}
+
+class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
+  override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
+    ") does not match value from current configuration (" + inConfig + ").  " +
+    "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
new file mode 100644
index 0000000..627631a
--- /dev/null
+++ b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package old.checkpoint
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.api._
+import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, TopicExistsException, UnknownTopicOrPartitionException}
+import kafka.consumer.SimpleConsumer
+import kafka.message.InvalidMessageException
+import kafka.utils.Utils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.apache.samza.container.TaskName
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.system.kafka.TopicMetadataCache
+import org.apache.samza.util.{ExponentialSleepStrategy, KafkaUtil, Logging, TopicMetadataStore}
+
+import scala.collection.mutable
+
+/**
+ * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
+ * To read a checkpoint for a specific taskName, we find the newest message
+ * keyed to that taskName. If there is no such message, no checkpoint data
+ * exists.  The underlying log has a single partition into which all
+ * checkpoints and TaskName to changelog partition mappings are written.
+ */
+class KafkaCheckpointManager(clientId: String,
+                              checkpointTopic: String,
+                              systemName: String,
+                              socketTimeout: Int,
+                              bufferSize: Int,
+                              fetchSize: Int,
+                              metadataStore: TopicMetadataStore,
+                              connectProducer: () => Producer[Array[Byte], Array[Byte]],
+                              connectZk: () => ZkClient,
+                              systemStreamPartitionGrouperFactoryString: String,
+                              retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
+                              serde: CheckpointSerde = new CheckpointSerde,
+                              checkpointTopicProperties: Properties = new Properties) extends Logging {
+  import KafkaCheckpointManager._
+
+  var taskNames = Set[TaskName]()
+  var producer: Producer[Array[Byte], Array[Byte]] = null
+  var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
+
+  var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
+
+  KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
+
+  info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
+
+  private def getConsumer(): SimpleConsumer = {
+    val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+    val metadata = metadataMap(checkpointTopic)
+    val partitionMetadata = metadata.partitionsMetadata
+      .filter(_.partitionId == 0)
+      .headOption
+      .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
+    val leader = partitionMetadata
+      .leader
+      .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
+
+    info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
+
+    new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
+  }
+
+  private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
+
+  private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
+    val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
+      .partitionErrorAndOffsets
+      .get(topicAndPartition)
+      .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
+    // Fail or retry if there was an an issue with the offset request.
+    KafkaUtil.maybeThrowException(offsetResponse.error)
+
+    val offset: Long = offsetResponse
+      .offsets
+      .headOption
+      .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic))
+
+    offset
+  }
+
+  /**
+   * Read the last checkpoint for specified TaskName
+   *
+   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+   */
+  def readLastCheckpoint(taskName: TaskName): Checkpoint = {
+    if (!taskNames.contains(taskName)) {
+      throw new SamzaException(taskName + " not registered with this CheckpointManager")
+    }
+
+    info("Reading checkpoint for taskName " + taskName)
+
+    if (taskNamesToOffsets == null) {
+      info("No TaskName to checkpoint mapping provided.  Reading for first time.")
+      taskNamesToOffsets = readCheckpointsFromLog()
+    } else {
+      info("Already existing checkpoint mapping.  Merging new offsets")
+      taskNamesToOffsets ++= readCheckpointsFromLog()
+    }
+
+    val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
+
+    info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
+
+    checkpoint
+  }
+
+  /**
+   * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
+   */
+  def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
+    val checkpoints = mutable.Map[TaskName, Checkpoint]()
+
+    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
+
+    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+      val taskName = checkpointKey.getCheckpointTaskName
+      val checkpoint = serde.fromBytes(Utils.readBytes(payload))
+      debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
+      checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
+    }
+
+    readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
+
+    checkpoints.toMap /* of the immutable kind */
+  }
+
+  /**
+   * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
+   *
+   * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based
+   * checkpoint log reading
+   */
+  def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
+    var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
+
+    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
+
+    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+      changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
+
+      debug("Adding changelog partition mapping" + changelogPartitionMapping)
+    }
+
+    readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
+
+    changelogPartitionMapping
+  }
+
+  /**
+   * Common code for reading both changelog partition mapping and change log
+   *
+   * @param entryType What type of entry to look for within the log key's
+   * @param handleEntry Code to handle an entry in the log once it's found
+   */
+  private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
+                      handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
+    retryBackoff.run[Unit](
+      loop => {
+        val consumer = getConsumer()
+
+        val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
+
+        try {
+          var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
+
+          info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
+
+          val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
+
+          info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
+
+          if (offset < 0) {
+            info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
+            return
+          }
+
+          while (offset < latestOffset) {
+            val request = new FetchRequestBuilder()
+              .addFetch(checkpointTopic, 0, offset, fetchSize)
+              .maxWait(500)
+              .minBytes(1)
+              .clientId(clientId)
+              .build
+
+            val fetchResponse = consumer.fetch(request)
+            if (fetchResponse.hasError) {
+              warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
+              val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
+              if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+                warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
+                return
+              }
+              KafkaUtil.maybeThrowException(errorCode)
+            }
+
+            for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
+              offset = response.nextOffset
+              startingOffset = Some(offset) // For next time we call
+
+              if (!response.message.hasKey) {
+                throw new KafkaCheckpointException("Encountered message without key.")
+              }
+
+              val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
+
+              if (!shouldHandleEntry(checkpointKey)) {
+                debug("Skipping " + entryType + " entry with key " + checkpointKey)
+              } else {
+                handleEntry(response.message.payload, checkpointKey)
+              }
+            }
+          }
+        } finally {
+          consumer.close()
+        }
+
+        loop.done
+        Unit
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: KafkaCheckpointException => throw e
+          case e: Exception =>
+            warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
+            debug("Exception detail:", e)
+        }
+      }
+    ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
+
+  }
+
+  def topicExists: Boolean = {
+    val zkClient = connectZk()
+    try {
+      AdminUtils.topicExists(zkClient, checkpointTopic)
+    } finally {
+      zkClient.close()
+    }
+  }
+
+  def start {
+    if (topicExists) {
+      validateTopic
+    } else {
+      throw new SamzaException("Failed to start KafkaCheckpointManager for non-existing checkpoint topic. KafkaCheckpointManager should only be used for migration purpose.")
+    }
+  }
+
+  def register(taskName: TaskName) {
+    debug("Adding taskName " + taskName + " to " + this)
+    taskNames += taskName
+  }
+
+  def stop = {
+    if (producer != null) {
+      producer.close
+    }
+  }
+
+  def validateTopic =  {
+    info("Validating checkpoint topic %s." format checkpointTopic)
+    retryBackoff.run(
+      loop => {
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo)
+        val topicMetadata = topicMetadataMap(checkpointTopic)
+        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+
+        val partitionCount = topicMetadata.partitionsMetadata.length
+        if (partitionCount != 1) {
+          throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length))
+        }
+
+        info("Successfully validated checkpoint topic %s." format checkpointTopic)
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: KafkaCheckpointException => throw e
+          case e: Exception =>
+            warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e))
+            debug("Exception detail:", e)
+        }
+      }
+    )
+  }
+
+  override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
+}
+
+object KafkaCheckpointManager {
+  val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
+  val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
+}
+
+/**
+ * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
+ * one to signal a hard failure, and the other to retry. The
+ * KafkaCheckpointException is thrown to indicate a hard failure that the Kafka
+ * CheckpointManager can't recover from.
+ */
+class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) {
+  def this(s: String) = this(s, null)
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
new file mode 100644
index 0000000..189752a
--- /dev/null
+++ b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package old.checkpoint
+
+import java.util.Properties
+
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging}
+
+object KafkaCheckpointManagerFactory {
+  /**
+   * Version number to track the format of the checkpoint log
+   */
+  val CHECKPOINT_LOG_VERSION_NUMBER = 1
+
+  val INJECTED_PRODUCER_PROPERTIES = Map(
+    "acks" -> "all",
+    // Forcibly disable compression because Kafka doesn't support compression
+    // on log compacted topics. Details in SAMZA-586.
+    "compression.type" -> "none")
+
+  // Set the checkpoint topic configs to have a very small segment size and
+  // enable log compaction. This keeps job startup time small since there
+  // are fewer useless (overwritten) messages to read from the checkpoint
+  // topic.
+  def getCheckpointTopicProperties(config: Config) = {
+    val segmentBytes = "26214400"
+
+    (new Properties /: Map(
+      "cleanup.policy" -> "compact",
+      "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
+  }
+}
+
+class KafkaCheckpointManagerFactory extends Logging {
+  import KafkaCheckpointManagerFactory._
+
+  def getCheckpointManager(config: Config, registry: MetricsRegistry): KafkaCheckpointManager = {
+    val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
+    val systemName = Option(config.get("task.checkpoint.system")).getOrElse(throw new SamzaException("no system defined for checkpoint manager, cannot perform migration."))
+    val producerConfig = config.getKafkaSystemProducerConfig(
+      systemName,
+      clientId,
+      INJECTED_PRODUCER_PROPERTIES)
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+    val socketTimeout = consumerConfig.socketTimeoutMs
+    val bufferSize = consumerConfig.socketReceiveBufferBytes
+    val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
+
+    val connectProducer = () => {
+      new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
+    }
+    val zkConnect = Option(consumerConfig.zkConnect)
+      .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+    val connectZk = () => {
+      new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    }
+    val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
+    val jobId = config.getJobId.getOrElse("1")
+    val bootstrapServers = producerConfig.bootsrapServers
+    val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, socketTimeout)
+    val checkpointTopic = getTopic(jobName, jobId)
+
+    // Find out the SSPGrouperFactory class so it can be included/verified in the key
+    val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory
+
+    new KafkaCheckpointManager(
+      clientId,
+      checkpointTopic,
+      systemName,
+      socketTimeout,
+      bufferSize,
+      fetchSize,
+      metadataStore,
+      connectProducer,
+      connectZk,
+      systemStreamPartitionGrouperFactoryString,
+      checkpointTopicProperties = getCheckpointTopicProperties(config))
+  }
+
+  private def getTopic(jobName: String, jobId: String) =
+    "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
new file mode 100644
index 0000000..2e10815
--- /dev/null
+++ b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package old.checkpoint
+
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.apache.samza.config.Config
+import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage}
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamSystemFactory}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.migration.MigrationPlan
+import org.apache.samza.storage.ChangelogPartitionManager
+import org.apache.samza.util.{Logging, NoOpMetricsRegistry}
+import scala.collection.JavaConverters._
+
+class KafkaCheckpointMigration extends MigrationPlan with Logging {
+  val source = "CHECKPOINTMIGRATION"
+  val migrationKey = "CheckpointMigration09to10"
+  val migrationVal = "true"
+
+  def migrate(config: Config, getManager:() => KafkaCheckpointManager): Unit = {
+    val coordinatorFactory = new CoordinatorStreamSystemFactory
+    val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+    var manager = getManager()
+    // make sure to validate that we only perform migration when the checkpoint topic exists
+    if (manager.topicExists) {
+      manager.validateTopic
+      val checkpointMap = manager.readCheckpointsFromLog()
+      manager.stop
+
+      manager = getManager()
+      val changelogMap = manager.readChangeLogPartitionMapping()
+      manager.stop
+
+      val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+      if (migrationVerification(coordinatorSystemConsumer)) {
+        info("Migration %s was already performed, doing nothing" format migrationKey)
+        return
+      }
+      info("No previous migration for %s were detected, performing migration" format migrationKey)
+      val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
+      checkpointManager.start()
+      checkpointMap.foreach { case (taskName: TaskName, checkpoint: Checkpoint) => checkpointManager.writeCheckpoint(taskName, checkpoint) }
+      val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
+      changelogPartitionManager.start()
+      changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap)
+      changelogPartitionManager.stop()
+      checkpointManager.stop()
+    }
+    migrationCompletionMark(coordinatorSystemProducer)
+  }
+
+  override def migrate(config: Config) {
+    val factory = new KafkaCheckpointManagerFactory
+    def getManager() = factory.getCheckpointManager(config, new NoOpMetricsRegistry)
+    migrate(config, getManager)
+  }
+
+  def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = {
+    coordinatorSystemConsumer.register()
+    coordinatorSystemConsumer.start()
+    coordinatorSystemConsumer.bootstrap()
+    val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
+    coordinatorSystemConsumer.stop()
+    val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
+    stream.contains(message.asInstanceOf[CoordinatorStreamMessage])
+  }
+
+  def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = {
+    info("Marking completion of migration %s" format migrationKey)
+    val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
+    coordinatorSystemProducer.start()
+    coordinatorSystemProducer.send(message)
+    coordinatorSystemProducer.stop()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index c0c34bf..9a44d46 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -55,6 +55,7 @@ class KafkaSystemProducer(systemName: String,
     if (producer != null) {
       latestFuture.keys.foreach(flush(_))
       producer.close
+      producer = null
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala
new file mode 100644
index 0000000..2c0304f
--- /dev/null
+++ b/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package old.checkpoint
+
+import java.util.Properties
+import kafka.admin.AdminUtils
+import kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
+import kafka.message.InvalidMessageException
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
+import kafka.zk.EmbeddedZookeeper
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord, KafkaProducer, ProducerConfig}
+import org.apache.samza.checkpoint.{CheckpointManager, Checkpoint}
+import org.apache.samza.config.SystemConfig._
+import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig}
+import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.coordinator.MockSystemFactory
+import org.apache.samza.coordinator.stream.messages.SetMigrationMetaMessage
+import org.apache.samza.coordinator.stream.{MockCoordinatorStreamSystemFactory, CoordinatorStreamSystemFactory}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.storage.ChangelogPartitionManager
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, TopicMetadataStore}
+import org.apache.samza.{Partition, SamzaException}
+import org.junit.Assert._
+import org.junit._
+import scala.collection.JavaConversions._
+import scala.collection._
+
+class TestKafkaCheckpointManager {
+
+  val checkpointTopic = "checkpoint-topic"
+  val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
+  val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
+  val zkConnect: String = TestZKUtils.zookeeperConnect
+  var zkClient: ZkClient = null
+  val zkConnectionTimeout = 6000
+  val zkSessionTimeout = 6000
+
+  val brokerId1 = 0
+  val brokerId2 = 1
+  val brokerId3 = 2
+  val ports = TestUtils.choosePorts(3)
+  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
+
+  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  props1.put("controlled.shutdown.enable", "true")
+  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  props1.put("controlled.shutdown.enable", "true")
+  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
+  props1.put("controlled.shutdown.enable", "true")
+
+  val config = new java.util.HashMap[String, Object]()
+  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
+  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+  config.put("acks", "all")
+  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+  config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
+  config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+  val partition = new Partition(0)
+  val partition2 = new Partition(1)
+  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
+  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
+  var zookeeper: EmbeddedZookeeper = null
+  var server1: KafkaServer = null
+  var server2: KafkaServer = null
+  var server3: KafkaServer = null
+  var metadataStore: TopicMetadataStore = null
+
+  val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
+
+  @Before
+  def beforeSetupServers {
+    zookeeper = new EmbeddedZookeeper(zkConnect)
+    server1 = TestUtils.createServer(new KafkaConfig(props1))
+    server2 = TestUtils.createServer(new KafkaConfig(props2))
+    server3 = TestUtils.createServer(new KafkaConfig(props3))
+    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+  }
+
+  @After
+  def afterCleanLogDirs {
+    server1.shutdown
+    server1.awaitShutdown()
+    server2.shutdown
+    server2.awaitShutdown()
+    server3.shutdown
+    server3.awaitShutdown()
+    Utils.rm(server1.config.logDirs)
+    Utils.rm(server2.config.logDirs)
+    Utils.rm(server3.config.logDirs)
+    zookeeper.shutdown
+  }
+
+  private def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint, cpTopic: String = checkpointTopic) = {
+    val producer: Producer[Array[Byte], Array[Byte]] = new KafkaProducer(producerConfig.getProducerProperties)
+    val record = new ProducerRecord(
+      cpTopic,
+      0,
+      KafkaCheckpointLogKey.getCheckpointKey(taskName).toBytes(),
+      new CheckpointSerde().toBytes(checkpoint)
+    )
+    try {
+      producer.send(record).get()
+    } catch {
+      case e: Exception => println(e.getMessage)
+    } finally {
+      producer.close()
+    }
+  }
+
+  private def writeChangeLogPartitionMapping(changelogMapping: Map[TaskName, Integer], cpTopic: String = checkpointTopic) = {
+    val producer: Producer[Array[Byte], Array[Byte]] = new KafkaProducer(producerConfig.getProducerProperties)
+    val record = new ProducerRecord(
+      cpTopic,
+      0,
+      KafkaCheckpointLogKey.getChangelogPartitionMappingKey().toBytes(),
+      new CheckpointSerde().changelogPartitionMappingToBytes(changelogMapping)
+    )
+    try {
+      producer.send(record).get()
+    } catch {
+      case e: Exception => println(e.getMessage)
+    } finally {
+      producer.close()
+    }
+  }
+
+  private def createCheckpointTopic(cpTopic: String = checkpointTopic) = {
+    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    try {
+      AdminUtils.createTopic(
+        zkClient,
+        checkpointTopic,
+        1,
+        1,
+        checkpointTopicConfig)
+    } catch {
+      case e: Exception => println(e.getMessage)
+    } finally {
+      zkClient.close
+    }
+  }
+
+  @Test
+  def testStartFailureWithNoCheckpointTopic() {
+    try {
+      val map = new java.util.HashMap[String, String]()
+      val mapConfig = Map(
+        "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+        JobConfig.JOB_NAME -> "test",
+        JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+        JobConfig.JOB_CONTAINER_COUNT -> "2",
+        "task.inputs" -> "test.stream1",
+        "task.checkpoint.system" -> "test",
+        SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+        SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
+      // Enable consumer caching
+      MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+      val config = new MapConfig(mapConfig)
+      val migrate = new KafkaCheckpointMigration
+      val oldCheckpointManager = getKafkaCheckpointManager
+      oldCheckpointManager.start
+      fail("KafkaCheckpointManager start should have failed")
+    } catch {
+      case se: SamzaException => assertEquals(se.getMessage, "Failed to start KafkaCheckpointManager for non-existing checkpoint topic. " +
+        "KafkaCheckpointManager should only be used for migration purpose.")
+    }
+  }
+
+  @Test
+  def testMigrationWithNoCheckpointTopic() {
+    try {
+      val map = new java.util.HashMap[String, String]()
+      val mapConfig = Map(
+        "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+        JobConfig.JOB_NAME -> "test",
+        JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+        JobConfig.JOB_CONTAINER_COUNT -> "2",
+        "task.inputs" -> "test.stream1",
+        "task.checkpoint.system" -> "test",
+        SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+        SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
+      // Enable consumer caching
+      MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+      val config = new MapConfig(mapConfig)
+      val migrate = new KafkaCheckpointMigration
+      val oldCheckpointManager = getKafkaCheckpointManager
+
+      // Write a couple of checkpoints in the old checkpoint topic
+      val task1 = new TaskName(partition.toString)
+      val task2 = new TaskName(partition2.toString)
+
+      val changelogMapping = Map()
+
+      // Initialize coordinator stream
+      val coordinatorFactory = new CoordinatorStreamSystemFactory()
+      val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+      val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+      coordinatorSystemConsumer.register()
+      coordinatorSystemConsumer.start()
+
+      assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 0)
+      coordinatorSystemConsumer.stop
+
+      // Start the migration
+      def getManager() = getKafkaCheckpointManager
+      migrate.migrate(config, getManager)
+      // Ensure migration step does not create the non-existing checkpoint topic
+      assertFalse(getManager.topicExists)
+
+      // Verify if the checkpoints have been migrated
+      val newCheckpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
+      newCheckpointManager.register(task1)
+      newCheckpointManager.register(task2)
+      newCheckpointManager.start()
+      assertNull(newCheckpointManager.readLastCheckpoint(task1))
+      assertNull(newCheckpointManager.readLastCheckpoint(task2))
+      newCheckpointManager.stop()
+
+      // Verify if the changelogPartitionInfo has been migrated
+      val newChangelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
+      newChangelogManager.start
+      val newChangelogMapping = newChangelogManager.readChangeLogPartitionMapping()
+      newChangelogManager.stop
+      assertEquals(newChangelogMapping.toMap, changelogMapping)
+
+      // Check for migration message
+      coordinatorSystemConsumer.register()
+      coordinatorSystemConsumer.start()
+      assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 1)
+      coordinatorSystemConsumer.stop()
+    }
+    finally {
+      MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
+    }
+  }
+
+  @Test
+  def testMigration() {
+    try {
+      val map = new java.util.HashMap[String, String]()
+      val mapConfig = Map(
+        "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+        JobConfig.JOB_NAME -> "test",
+        JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+        JobConfig.JOB_CONTAINER_COUNT -> "2",
+        "task.inputs" -> "test.stream1",
+        "task.checkpoint.system" -> "test",
+        SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+        SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
+      // Enable consumer caching
+      MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+      val config = new MapConfig(mapConfig)
+      val migrate = new KafkaCheckpointMigration
+      val oldCheckpointManager = getKafkaCheckpointManager
+
+      createCheckpointTopic()
+      oldCheckpointManager.validateTopic
+
+      // Write a couple of checkpoints in the old checkpoint topic
+      val task1 = new TaskName(partition.toString)
+      val task2 = new TaskName(partition2.toString)
+      writeCheckpoint(task1, cp1)
+      writeCheckpoint(task2, cp2)
+
+      val changelogMapping = Map(task1 -> 1.asInstanceOf[Integer], task2 -> 10.asInstanceOf[Integer])
+      // Write changelog partition info to the old checkpoint topic
+      writeChangeLogPartitionMapping(changelogMapping)
+      oldCheckpointManager.stop
+
+      // Initialize coordinator stream
+      val coordinatorFactory = new CoordinatorStreamSystemFactory()
+      val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+      val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+      coordinatorSystemConsumer.register()
+      coordinatorSystemConsumer.start()
+
+      assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 0)
+      coordinatorSystemConsumer.stop
+
+      // Start the migration
+      def getManager() = getKafkaCheckpointManager
+      migrate.migrate(config, getManager)
+
+      // Verify if the checkpoints have been migrated
+      val newCheckpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
+      newCheckpointManager.register(task1)
+      newCheckpointManager.register(task2)
+      newCheckpointManager.start()
+      assertEquals(cp1, newCheckpointManager.readLastCheckpoint(task1))
+      assertEquals(cp2, newCheckpointManager.readLastCheckpoint(task2))
+      newCheckpointManager.stop()
+
+      // Verify if the changelogPartitionInfo has been migrated
+      val newChangelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
+      newChangelogManager.start
+      val newChangelogMapping = newChangelogManager.readChangeLogPartitionMapping()
+      newChangelogManager.stop
+      assertEquals(newChangelogMapping.toMap, changelogMapping)
+
+      // Check for migration message
+      coordinatorSystemConsumer.register()
+      coordinatorSystemConsumer.start()
+      assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 1)
+      coordinatorSystemConsumer.stop()
+    }
+    finally {
+      MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
+    }
+  }
+
+  @Test
+  def testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
+    val kcm = getKafkaCheckpointManager
+    val taskName = new TaskName(partition.toString)
+    kcm.register(taskName)
+    createCheckpointTopic()
+    kcm.validateTopic
+    // check that log compaction is enabled.
+    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
+    zkClient.close
+    assertEquals("compact", topicConfig.get("cleanup.policy"))
+    assertEquals("26214400", topicConfig.get("segment.bytes"))
+    // read before topic exists should result in a null checkpoint
+    var readCp = kcm.readLastCheckpoint(taskName)
+    assertNull(readCp)
+    // create topic the first time around
+    writeCheckpoint(taskName, cp1)
+    readCp = kcm.readLastCheckpoint(taskName)
+    assertEquals(cp1, readCp)
+    // should get an exception if partition doesn't exist
+    try {
+      readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString))
+      fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
+    } catch {
+      case e: SamzaException => None // expected
+      case _: Exception => fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
+    }
+    // writing a second message should work, too
+    writeCheckpoint(taskName, cp2)
+    readCp = kcm.readLastCheckpoint(taskName)
+    assertEquals(cp2, readCp)
+    kcm.stop
+  }
+
+  @Test
+  def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
+    val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException")
+    exceptions.foreach { exceptionName =>
+      val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
+      val taskName = new TaskName(partition.toString)
+      kcm.register(taskName)
+      createCheckpointTopic(serdeCheckpointTopic)
+      kcm.validateTopic
+      writeCheckpoint(taskName, cp1, serdeCheckpointTopic)
+      // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
+      try {
+        kcm.readLastCheckpoint(taskName)
+        fail("Expected a KafkaCheckpointException.")
+      } catch {
+        case e: KafkaCheckpointException => None
+      }
+      kcm.stop
+    }
+  }
+
+  private def getKafkaCheckpointManager = new KafkaCheckpointManager(
+    clientId = "some-client-id",
+    checkpointTopic = checkpointTopic,
+    systemName = "kafka",
+    socketTimeout = 30000,
+    bufferSize = 64 * 1024,
+    fetchSize = 300 * 1024,
+    metadataStore = metadataStore,
+    connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
+    connectZk = () => new ZkClient(zkConnect, 60000, 60000, ZKStringSerializer),
+    systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
+    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
+
+  // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
+  private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
+    clientId = "some-client-id-invalid-serde",
+    checkpointTopic = serdeCheckpointTopic,
+    systemName = "kafka",
+    socketTimeout = 30000,
+    bufferSize = 64 * 1024,
+    fetchSize = 300 * 1024,
+    metadataStore = metadataStore,
+    connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
+    connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+    systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
+    serde = new InvalideSerde(exception),
+    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
+
+  class InvalideSerde(exception: String) extends CheckpointSerde {
+    override def fromBytes(bytes: Array[Byte]): Checkpoint = {
+      exception match {
+        case "InvalidMessageException" => throw new InvalidMessageException
+        case "InvalidMessageSizeException" => throw new InvalidMessageSizeException
+        case "UnknownTopicOrPartitionException" => throw new UnknownTopicOrPartitionException
+      }
+    }
+  }
+}