You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ma...@apache.org on 2014/04/02 21:10:15 UTC

git commit: SAMZA-180: Command-line tool for manipulating checkpoints. Reviewed by Chris Riccomini.

Repository: incubator-samza
Updated Branches:
  refs/heads/master 4d0ad620b -> 7d7ba7fbc


SAMZA-180: Command-line tool for manipulating checkpoints. Reviewed by Chris Riccomini.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7d7ba7fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7d7ba7fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7d7ba7fb

Branch: refs/heads/master
Commit: 7d7ba7fbc1300c90ed762d8ee5bde800e72bd776
Parents: 4d0ad62
Author: Martin Kleppmann <mk...@linkedin.com>
Authored: Thu Mar 20 18:36:01 2014 +0000
Committer: Martin Kleppmann <mk...@linkedin.com>
Committed: Wed Apr 2 18:57:21 2014 +0100

----------------------------------------------------------------------
 README.md                                       |  15 ++
 build.gradle                                    |  34 +++-
 gradle/dependency-versions.gradle               |   2 +
 .../samza/checkpoint/CheckpointTool.scala       | 164 +++++++++++++++++++
 .../scala/org/apache/samza/job/JobRunner.scala  |  56 +------
 .../org/apache/samza/util/CommandLine.scala     |  73 +++++++++
 .../samza/checkpoint/TestCheckpointTool.scala   |  96 +++++++++++
 .../kafka/KafkaCheckpointManager.scala          |   2 +
 samza-shell/src/main/bash/checkpoint-tool.sh    |  19 +++
 9 files changed, 412 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 7a4a232..7d0ff10 100644
--- a/README.md
+++ b/README.md
@@ -34,6 +34,21 @@ To run a single test:
 
     ./gradlew clean :samza-test:test -Dtest.single=TestStatefulTask
 
+### Job Management
+
+To run a job (defined in a properties file):
+
+    ./gradlew samza-shell:runJob -PconfigPath=file:///path/to/job/config.properties
+
+To inspect a job's latest checkpoint:
+
+    ./gradlew samza-shell:checkpointTool -PconfigPath=file:///path/to/job/config.properties
+
+To modify a job's checkpoint (assumes that the job is not currently running), give it a file with the new offset for each partition, in the format `systems.<system>.streams.<topic>.partitions.<partition>=<offset>`:
+
+    ./gradlew samza-shell:checkpointTool -PconfigPath=file:///path/to/job/config.properties \
+        -PnewOffsets=file:///path/to/new/offsets.properties
+
 #### Maven
 
 Samza uses Kafka, which is not managed by Maven. To use Kafka as though it were a Maven artifact, Samza installs Kafka into a local repository using the `mvn install` command. You must have Maven installed to build Samza.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4775087..75b6c98 100644
--- a/build.gradle
+++ b/build.gradle
@@ -62,6 +62,8 @@ project(":samza-core_$scalaVersion") {
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
     compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion"
     testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
   }
 }
 
@@ -91,7 +93,7 @@ project(":samza-kafka_$scalaVersion") {
     testCompile files("lib/kafka_$scalaVersion-0.8.1-SNAPSHOT-test.jar")
 
     // Logging in tests is good.
-    testRuntime "org.slf4j:slf4j-simple:1.6.2"
+    testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
 
   test {
@@ -161,11 +163,39 @@ project(":samza-yarn_$scalaVersion") {
 project(":samza-shell") {
   apply plugin: 'java'
 
+  configurations {
+    gradleShell
+  }
+
+  dependencies {
+    gradleShell project(":samza-core_$scalaVersion")
+    gradleShell project(":samza-kafka_$scalaVersion")
+    gradleShell project(":samza-yarn_$scalaVersion")
+    gradleShell "org.slf4j:slf4j-simple:$slf4jVersion"
+  }
+
   task shellTarGz(type: Tar) { 
     compression = Compression.GZIP 
     classifier = 'dist'
     from 'src/main/bash'
   }
+
+  // Usage: ./gradlew samza-shell:runJob \
+  //    -PconfigPath=file:///path/to/job/config.properties
+  task runJob(type:JavaExec) {
+    main = 'org.apache.samza.job.JobRunner'
+    classpath = configurations.gradleShell
+    if (project.hasProperty('configPath')) args += ['--config-path', configPath]
+  }
+
+  // Usage: ./gradlew samza-shell:checkpointTool \
+  //    -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
+  task checkpointTool(type:JavaExec) {
+    main = 'org.apache.samza.checkpoint.CheckpointTool'
+    classpath = configurations.gradleShell
+    if (project.hasProperty('configPath')) args += ['--config-path', configPath]
+    if (project.hasProperty('newOffsets')) args += ['--new-offsets', newOffsets]
+  }
 }
 
 project(":samza-kv_$scalaVersion") {
@@ -207,7 +237,7 @@ project(":samza-test_$scalaVersion") {
     testCompile files("../samza-kafka/lib/kafka_$scalaVersion-0.8.1-SNAPSHOT-test.jar")
     testCompile "com.101tec:zkclient:$zkClientVersion"
     testCompile project(":samza-kafka_$scalaVersion")
-    testRuntime "org.slf4j:slf4j-simple:1.6.2"
+    testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
 
   test {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 612670d..6e51c04 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -3,6 +3,7 @@ ext {
   jacksonVersion = "1.8.5"
   junitVersion = "4.8.1"
   mockitoVersion = "1.8.4"
+  scalaTestVersion = "2.1.0"
   zkClientVersion = "0.3"
   zookeeperVersion = "3.3.4"
   metricsVersion = "2.2.0"
@@ -10,4 +11,5 @@ ext {
   commonsHttpClientVersion = "3.1"
   leveldbVersion = "1.8"
   yarnVersion = "2.2.0"
+  slf4jVersion = "1.6.2"
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
new file mode 100644
index 0000000..5735a39
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import java.net.URI
+import java.util.regex.Pattern
+import joptsimple.OptionSet
+import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.config.{Config, StreamConfig}
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util.{CommandLine, Util}
+import scala.collection.JavaConversions._
+import grizzled.slf4j.Logging
+
+/**
+ * Command-line tool for inspecting and manipulating the checkpoints for a job.
+ * This can be used, for example, to force a job to re-process a stream from the
+ * beginning.
+ *
+ * When running this tool, you need to provide the configuration URI of job you
+ * want to inspect/manipulate. The tool prints out the latest checkpoint for that
+ * job (latest offset of each partition of each input stream).
+ *
+ * To update the checkpoint, you need to provide a second properties file
+ * containing the offsets you want. It needs to be in the same format as the tool
+ * prints out the latest checkpoint:
+ *
+ *   systems.<system>.streams.<topic>.partitions.<partition>=<offset>
+ *
+ * NOTE: A job only reads its checkpoint when it starts up. Therefore, if you want
+ * your checkpoint change to take effect, you have to first stop the job, then
+ * write a new checkpoint, then start it up again. Writing a new checkpoint while
+ * the job is running may not have any effect.
+ *
+ * If you're building Samza from source, you can use the 'checkpointTool' gradle
+ * task as a short-cut to running this tool.
+ */
+object CheckpointTool {
+  /** Format in which SystemStreamPartition is represented in a properties file */
+  val SSP_PATTERN = StreamConfig.STREAM_PREFIX + "partitions.%d"
+  val SSP_REGEX = Pattern.compile("systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)")
+
+  class CheckpointToolCommandLine extends CommandLine with Logging {
+    val newOffsetsOpt =
+      parser.accepts("new-offsets", "URI of file (e.g. file:///some/local/path.properties) " +
+                                    "containing offsets to write to the job's checkpoint topic. " +
+                                    "If not given, this tool prints out the current offsets.")
+            .withRequiredArg
+            .ofType(classOf[URI])
+            .describedAs("path")
+
+    var newOffsets: Map[SystemStreamPartition, String] = null
+
+    def parseOffsets(propertiesFile: Config): Map[SystemStreamPartition, String] = {
+      propertiesFile.entrySet.flatMap(entry => {
+        val matcher = SSP_REGEX.matcher(entry.getKey)
+        if (matcher.matches) {
+          val partition = new Partition(Integer.parseInt(matcher.group(3)))
+          val ssp = new SystemStreamPartition(matcher.group(1), matcher.group(2), partition)
+          Some(ssp -> entry.getValue)
+        } else {
+          warn("Warning: ignoring unrecognised property: %s = %s" format (entry.getKey, entry.getValue))
+          None
+        }
+      }).toMap
+    }
+
+    override def loadConfig(options: OptionSet) = {
+      val config = super.loadConfig(options)
+      if (options.has(newOffsetsOpt)) {
+        val properties = configFactory.getConfig(options.valueOf(newOffsetsOpt))
+        newOffsets = parseOffsets(properties)
+      }
+      config
+    }
+  }
+
+  def main(args: Array[String]) {
+    val cmdline = new CheckpointToolCommandLine
+    val options = cmdline.parser.parse(args: _*)
+    val config = cmdline.loadConfig(options)
+    val tool = new CheckpointTool(config, cmdline.newOffsets)
+    tool.run
+  }
+}
+
+class CheckpointTool(config: Config, newOffsets: Map[SystemStreamPartition, String]) extends Logging {
+  val manager = config.getCheckpointManagerFactory match {
+    case Some(className) =>
+      Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap)
+    case _ =>
+      throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).")
+  }
+
+  // The CheckpointManagerFactory needs to perform this same operation when initializing
+  // the manager. TODO figure out some way of avoiding duplicated work.
+  val partitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet
+
+  def run {
+    info("Using %s" format manager)
+    partitions.foreach(manager.register)
+    manager.start
+    val lastCheckpoint = readLastCheckpoint
+
+    logCheckpoint(lastCheckpoint, "Current checkpoint")
+
+    if (newOffsets != null) {
+      logCheckpoint(newOffsets, "New offset to be written")
+      writeNewCheckpoint(newOffsets)
+      manager.stop
+      info("Ok, new checkpoint has been written.")
+    }
+  }
+
+  /** Load the most recent checkpoint state for all partitions. */
+  def readLastCheckpoint: Map[SystemStreamPartition, String] = {
+    partitions.flatMap(partition => {
+      manager.readLastCheckpoint(partition)
+        .getOffsets
+        .map { case (systemStream, offset) =>
+          new SystemStreamPartition(systemStream, partition) -> offset
+        }
+    }).toMap
+  }
+
+  /**
+   * Store a new checkpoint state for all given partitions, overwriting the
+   * current state. Any partitions that are not mentioned will not
+   * be changed.
+   */
+  def writeNewCheckpoint(newOffsets: Map[SystemStreamPartition, String]) {
+    newOffsets.groupBy(_._1.getPartition).foreach {
+      case (partition, offsets) =>
+        val streamOffsets = offsets.map { case (ssp, offset) => ssp.getSystemStream -> offset }.toMap
+        val checkpoint = new Checkpoint(streamOffsets)
+        manager.writeCheckpoint(partition, checkpoint)
+    }
+  }
+
+  def logCheckpoint(checkpoint: Map[SystemStreamPartition, String], prefix: String) {
+    checkpoint.map { case (ssp, offset) =>
+      (CheckpointTool.SSP_PATTERN + " = %s") format (ssp.getSystem, ssp.getStream, ssp.getPartition.getPartitionId, offset)
+    }.toList.sorted.foreach(line => info(prefix + ": " + line))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index f3a75af..19c9538 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -19,60 +19,22 @@
 
 package org.apache.samza.job
 
-import java.lang.String
-import java.net.URI
-import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config._
-import grizzled.slf4j.Logging
-import joptsimple.OptionParser
-import joptsimple.util.KeyValuePair
 import org.apache.samza.SamzaException
+import org.apache.samza.config.{Config, ConfigRewriter}
+import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.factories.PropertiesConfigFactory
-import scala.Some
+import org.apache.samza.job.ApplicationStatus.Running
 import org.apache.samza.util.Util
-import scala.collection.mutable.Buffer
+import org.apache.samza.util.CommandLine
+import grizzled.slf4j.Logging
 import scala.collection.JavaConversions._
 
 object JobRunner extends Logging {
   def main(args: Array[String]) {
-    // Define parameters.
-    var parser = new OptionParser()
-    val configFactoryOpt = 
-      parser.accepts("config-factory", "The config factory to use to read your config file.")
-            .withRequiredArg
-            .ofType(classOf[java.lang.String])
-            .describedAs("com.foo.bar.ClassName")
-            .defaultsTo(classOf[PropertiesConfigFactory].getName)
-    val configPathOpt =
-      parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " + 
-                                    "If multiple files are given they are all used with later files overriding any values that appear in earlier files.")
-            .withRequiredArg
-            .ofType(classOf[URI])
-            .describedAs("path")
-    val configOverrideOpt = 
-      parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.")
-            .withRequiredArg
-            .ofType(classOf[KeyValuePair])
-            .describedAs("key=value")
-    var options = parser.parse(args: _*)
-
-    // Verify legitimate parameters.
-    if (!options.has(configPathOpt)) {
-      parser.printHelpOn(System.err)
-      System.exit(-1)
-    }
-
-    // Set up the job parameters.
-    val configFactoryClass = options.valueOf(configFactoryOpt)
-    val configPaths = options.valuesOf(configPathOpt)
-    val configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory]
-    val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap
-    
-    val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig)
-    configs += configOverrides
-
-    new JobRunner(new MapConfig(configs)).run
+    val cmdline = new CommandLine
+    val options = cmdline.parser.parse(args: _*)
+    val config = cmdline.loadConfig(options)
+    new JobRunner(config).run
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
new file mode 100644
index 0000000..f26501b
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import java.net.URI
+import joptsimple.{OptionParser, OptionSet}
+import joptsimple.util.KeyValuePair
+import org.apache.samza.config.{ConfigFactory, MapConfig}
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import scala.collection.mutable.Buffer
+import scala.collection.JavaConversions._
+
+/**
+ * Defines a basic set of command-line options for Samza tasks. Tools can use this
+ * class directly, or subclass it to add their own options.
+ */
+class CommandLine {
+  val parser = new OptionParser()
+  val configFactoryOpt = 
+    parser.accepts("config-factory", "The config factory to use to read your config file.")
+          .withRequiredArg
+          .ofType(classOf[java.lang.String])
+          .describedAs("com.foo.bar.ClassName")
+          .defaultsTo(classOf[PropertiesConfigFactory].getName)
+  val configPathOpt =
+    parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " + 
+                                  "If multiple files are given they are all used with later files overriding any values that appear in earlier files.")
+          .withRequiredArg
+          .ofType(classOf[URI])
+          .describedAs("path")
+  val configOverrideOpt = 
+    parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.")
+          .withRequiredArg
+          .ofType(classOf[KeyValuePair])
+          .describedAs("key=value")
+
+  var configFactory: ConfigFactory = null
+
+  def loadConfig(options: OptionSet) = {
+    // Verify legitimate parameters.
+    if (!options.has(configPathOpt)) {
+      parser.printHelpOn(System.err)
+      System.exit(-1)
+    }
+
+    // Set up the job parameters.
+    val configFactoryClass = options.valueOf(configFactoryOpt)
+    val configPaths = options.valuesOf(configPathOpt)
+    configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory]
+    val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap
+
+    val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig)
+    configs += configOverrides
+    new MapConfig(configs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
new file mode 100644
index 0000000..bc54f9e
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import org.junit.{Before, Test}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.junit.AssertionsForJUnit
+import org.scalatest.mock.MockitoSugar
+import scala.collection.JavaConversions._
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
+import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig}
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+
+object TestCheckpointTool {
+  var checkpointManager: CheckpointManager = null
+  var systemConsumer: SystemConsumer = null
+  var systemProducer: SystemProducer = null
+  var systemAdmin: SystemAdmin = null
+
+  class MockCheckpointManagerFactory extends CheckpointManagerFactory {
+    override def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
+  }
+
+  class MockSystemFactory extends SystemFactory {
+    override def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = systemConsumer
+    override def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = systemProducer
+    override def getAdmin(systemName: String, config: Config) = systemAdmin
+  }
+}
+
+class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
+  var config: MapConfig = null
+
+  @Before
+  def setup {
+    config = new MapConfig(Map(
+      TaskConfig.INPUT_STREAMS -> "test.foo",
+      TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
+      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName
+    ))
+    val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
+      new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
+      new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
+    ))
+
+    TestCheckpointTool.checkpointManager = mock[CheckpointManager]
+    TestCheckpointTool.systemAdmin = mock[SystemAdmin]
+    when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo")))
+      .thenReturn(Map("foo" -> metadata))
+    when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new Partition(0)))
+      .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> "1234")))
+    when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new Partition(1)))
+      .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> "4321")))
+  }
+
+  @Test
+  def testReadLatestCheckpoint {
+    new CheckpointTool(config, null).run
+    verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new Partition(0))
+    verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new Partition(1))
+    verify(TestCheckpointTool.checkpointManager, never()).writeCheckpoint(any(), any())
+  }
+
+  @Test
+  def testOverwriteCheckpoint {
+    new CheckpointTool(config, Map(
+      new SystemStreamPartition("test", "foo", new Partition(0)) -> "42",
+      new SystemStreamPartition("test", "foo", new Partition(1)) -> "43"
+    )).run
+    verify(TestCheckpointTool.checkpointManager)
+      .writeCheckpoint(new Partition(0), new Checkpoint(Map(new SystemStream("test", "foo") -> "42")))
+    verify(TestCheckpointTool.checkpointManager)
+      .writeCheckpoint(new Partition(1), new Checkpoint(Map(new SystemStream("test", "foo") -> "43")))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index e98b50a..fed6eee 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -270,6 +270,8 @@ class KafkaCheckpointManager(
       }
     )
   }
+
+  override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format (systemName, checkpointTopic)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-shell/src/main/bash/checkpoint-tool.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/checkpoint-tool.sh b/samza-shell/src/main/bash/checkpoint-tool.sh
new file mode 100755
index 0000000..1a455df
--- /dev/null
+++ b/samza-shell/src/main/bash/checkpoint-tool.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+exec $(dirname $0)/run-class.sh org.apache.samza.checkpoint.CheckpointTool $@