You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ma...@apache.org on 2014/04/02 21:10:15 UTC
git commit: SAMZA-180: Command-line tool for manipulating
checkpoints. Reviewed by Chris Riccomini.
Repository: incubator-samza
Updated Branches:
refs/heads/master 4d0ad620b -> 7d7ba7fbc
SAMZA-180: Command-line tool for manipulating checkpoints. Reviewed by Chris Riccomini.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7d7ba7fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7d7ba7fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7d7ba7fb
Branch: refs/heads/master
Commit: 7d7ba7fbc1300c90ed762d8ee5bde800e72bd776
Parents: 4d0ad62
Author: Martin Kleppmann <mk...@linkedin.com>
Authored: Thu Mar 20 18:36:01 2014 +0000
Committer: Martin Kleppmann <mk...@linkedin.com>
Committed: Wed Apr 2 18:57:21 2014 +0100
----------------------------------------------------------------------
README.md | 15 ++
build.gradle | 34 +++-
gradle/dependency-versions.gradle | 2 +
.../samza/checkpoint/CheckpointTool.scala | 164 +++++++++++++++++++
.../scala/org/apache/samza/job/JobRunner.scala | 56 +------
.../org/apache/samza/util/CommandLine.scala | 73 +++++++++
.../samza/checkpoint/TestCheckpointTool.scala | 96 +++++++++++
.../kafka/KafkaCheckpointManager.scala | 2 +
samza-shell/src/main/bash/checkpoint-tool.sh | 19 +++
9 files changed, 412 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 7a4a232..7d0ff10 100644
--- a/README.md
+++ b/README.md
@@ -34,6 +34,21 @@ To run a single test:
./gradlew clean :samza-test:test -Dtest.single=TestStatefulTask
+### Job Management
+
+To run a job (defined in a properties file):
+
+ ./gradlew samza-shell:runJob -PconfigPath=file:///path/to/job/config.properties
+
+To inspect a job's latest checkpoint:
+
+ ./gradlew samza-shell:checkpointTool -PconfigPath=file:///path/to/job/config.properties
+
+To modify a job's checkpoint (assumes that the job is not currently running), give it a file with the new offset for each partition, in the format `systems.<system>.streams.<topic>.partitions.<partition>=<offset>`:
+
+ ./gradlew samza-shell:checkpointTool -PconfigPath=file:///path/to/job/config.properties \
+ -PnewOffsets=file:///path/to/new/offsets.properties
+
#### Maven
Samza uses Kafka, which is not managed by Maven. To use Kafka as though it were a Maven artifact, Samza installs Kafka into a local repository using the `mvn install` command. You must have Maven installed to build Samza.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4775087..75b6c98 100644
--- a/build.gradle
+++ b/build.gradle
@@ -62,6 +62,8 @@ project(":samza-core_$scalaVersion") {
compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion"
testCompile "junit:junit:$junitVersion"
+ testCompile "org.mockito:mockito-all:$mockitoVersion"
+ testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
}
}
@@ -91,7 +93,7 @@ project(":samza-kafka_$scalaVersion") {
testCompile files("lib/kafka_$scalaVersion-0.8.1-SNAPSHOT-test.jar")
// Logging in tests is good.
- testRuntime "org.slf4j:slf4j-simple:1.6.2"
+ testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
}
test {
@@ -161,11 +163,39 @@ project(":samza-yarn_$scalaVersion") {
project(":samza-shell") {
apply plugin: 'java'
+ configurations {
+ gradleShell
+ }
+
+ dependencies {
+ gradleShell project(":samza-core_$scalaVersion")
+ gradleShell project(":samza-kafka_$scalaVersion")
+ gradleShell project(":samza-yarn_$scalaVersion")
+ gradleShell "org.slf4j:slf4j-simple:$slf4jVersion"
+ }
+
task shellTarGz(type: Tar) {
compression = Compression.GZIP
classifier = 'dist'
from 'src/main/bash'
}
+
+ // Usage: ./gradlew samza-shell:runJob \
+ // -PconfigPath=file:///path/to/job/config.properties
+ task runJob(type:JavaExec) {
+ main = 'org.apache.samza.job.JobRunner'
+ classpath = configurations.gradleShell
+ if (project.hasProperty('configPath')) args += ['--config-path', configPath]
+ }
+
+ // Usage: ./gradlew samza-shell:checkpointTool \
+ // -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
+ task checkpointTool(type:JavaExec) {
+ main = 'org.apache.samza.checkpoint.CheckpointTool'
+ classpath = configurations.gradleShell
+ if (project.hasProperty('configPath')) args += ['--config-path', configPath]
+ if (project.hasProperty('newOffsets')) args += ['--new-offsets', newOffsets]
+ }
}
project(":samza-kv_$scalaVersion") {
@@ -207,7 +237,7 @@ project(":samza-test_$scalaVersion") {
testCompile files("../samza-kafka/lib/kafka_$scalaVersion-0.8.1-SNAPSHOT-test.jar")
testCompile "com.101tec:zkclient:$zkClientVersion"
testCompile project(":samza-kafka_$scalaVersion")
- testRuntime "org.slf4j:slf4j-simple:1.6.2"
+ testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
}
test {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 612670d..6e51c04 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -3,6 +3,7 @@ ext {
jacksonVersion = "1.8.5"
junitVersion = "4.8.1"
mockitoVersion = "1.8.4"
+ scalaTestVersion = "2.1.0"
zkClientVersion = "0.3"
zookeeperVersion = "3.3.4"
metricsVersion = "2.2.0"
@@ -10,4 +11,5 @@ ext {
commonsHttpClientVersion = "3.1"
leveldbVersion = "1.8"
yarnVersion = "2.2.0"
+ slf4jVersion = "1.6.2"
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
new file mode 100644
index 0000000..5735a39
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import java.net.URI
+import java.util.regex.Pattern
+import joptsimple.OptionSet
+import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.config.{Config, StreamConfig}
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util.{CommandLine, Util}
+import scala.collection.JavaConversions._
+import grizzled.slf4j.Logging
+
+/**
+ * Command-line tool for inspecting and manipulating the checkpoints for a job.
+ * This can be used, for example, to force a job to re-process a stream from the
+ * beginning.
+ *
+ * When running this tool, you need to provide the configuration URI of job you
+ * want to inspect/manipulate. The tool prints out the latest checkpoint for that
+ * job (latest offset of each partition of each input stream).
+ *
+ * To update the checkpoint, you need to provide a second properties file
+ * containing the offsets you want. It needs to be in the same format as the tool
+ * prints out the latest checkpoint:
+ *
+ * systems.<system>.streams.<topic>.partitions.<partition>=<offset>
+ *
+ * NOTE: A job only reads its checkpoint when it starts up. Therefore, if you want
+ * your checkpoint change to take effect, you have to first stop the job, then
+ * write a new checkpoint, then start it up again. Writing a new checkpoint while
+ * the job is running may not have any effect.
+ *
+ * If you're building Samza from source, you can use the 'checkpointTool' gradle
+ * task as a short-cut to running this tool.
+ */
+object CheckpointTool {
+ /** Format in which SystemStreamPartition is represented in a properties file */
+ val SSP_PATTERN = StreamConfig.STREAM_PREFIX + "partitions.%d"
+ val SSP_REGEX = Pattern.compile("systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)")
+
+ class CheckpointToolCommandLine extends CommandLine with Logging {
+ val newOffsetsOpt =
+ parser.accepts("new-offsets", "URI of file (e.g. file:///some/local/path.properties) " +
+ "containing offsets to write to the job's checkpoint topic. " +
+ "If not given, this tool prints out the current offsets.")
+ .withRequiredArg
+ .ofType(classOf[URI])
+ .describedAs("path")
+
+ var newOffsets: Map[SystemStreamPartition, String] = null
+
+ def parseOffsets(propertiesFile: Config): Map[SystemStreamPartition, String] = {
+ propertiesFile.entrySet.flatMap(entry => {
+ val matcher = SSP_REGEX.matcher(entry.getKey)
+ if (matcher.matches) {
+ val partition = new Partition(Integer.parseInt(matcher.group(3)))
+ val ssp = new SystemStreamPartition(matcher.group(1), matcher.group(2), partition)
+ Some(ssp -> entry.getValue)
+ } else {
+ warn("Warning: ignoring unrecognised property: %s = %s" format (entry.getKey, entry.getValue))
+ None
+ }
+ }).toMap
+ }
+
+ override def loadConfig(options: OptionSet) = {
+ val config = super.loadConfig(options)
+ if (options.has(newOffsetsOpt)) {
+ val properties = configFactory.getConfig(options.valueOf(newOffsetsOpt))
+ newOffsets = parseOffsets(properties)
+ }
+ config
+ }
+ }
+
+ def main(args: Array[String]) {
+ val cmdline = new CheckpointToolCommandLine
+ val options = cmdline.parser.parse(args: _*)
+ val config = cmdline.loadConfig(options)
+ val tool = new CheckpointTool(config, cmdline.newOffsets)
+ tool.run
+ }
+}
+
+class CheckpointTool(config: Config, newOffsets: Map[SystemStreamPartition, String]) extends Logging {
+ val manager = config.getCheckpointManagerFactory match {
+ case Some(className) =>
+ Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap)
+ case _ =>
+ throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).")
+ }
+
+ // The CheckpointManagerFactory needs to perform this same operation when initializing
+ // the manager. TODO figure out some way of avoiding duplicated work.
+ val partitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet
+
+ def run {
+ info("Using %s" format manager)
+ partitions.foreach(manager.register)
+ manager.start
+ val lastCheckpoint = readLastCheckpoint
+
+ logCheckpoint(lastCheckpoint, "Current checkpoint")
+
+ if (newOffsets != null) {
+ logCheckpoint(newOffsets, "New offset to be written")
+ writeNewCheckpoint(newOffsets)
+ manager.stop
+ info("Ok, new checkpoint has been written.")
+ }
+ }
+
+ /** Load the most recent checkpoint state for all partitions. */
+ def readLastCheckpoint: Map[SystemStreamPartition, String] = {
+ partitions.flatMap(partition => {
+ manager.readLastCheckpoint(partition)
+ .getOffsets
+ .map { case (systemStream, offset) =>
+ new SystemStreamPartition(systemStream, partition) -> offset
+ }
+ }).toMap
+ }
+
+ /**
+ * Store a new checkpoint state for all given partitions, overwriting the
+ * current state. Any partitions that are not mentioned will not
+ * be changed.
+ */
+ def writeNewCheckpoint(newOffsets: Map[SystemStreamPartition, String]) {
+ newOffsets.groupBy(_._1.getPartition).foreach {
+ case (partition, offsets) =>
+ val streamOffsets = offsets.map { case (ssp, offset) => ssp.getSystemStream -> offset }.toMap
+ val checkpoint = new Checkpoint(streamOffsets)
+ manager.writeCheckpoint(partition, checkpoint)
+ }
+ }
+
+ def logCheckpoint(checkpoint: Map[SystemStreamPartition, String], prefix: String) {
+ checkpoint.map { case (ssp, offset) =>
+ (CheckpointTool.SSP_PATTERN + " = %s") format (ssp.getSystem, ssp.getStream, ssp.getPartition.getPartitionId, offset)
+ }.toList.sorted.foreach(line => info(prefix + ": " + line))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index f3a75af..19c9538 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -19,60 +19,22 @@
package org.apache.samza.job
-import java.lang.String
-import java.net.URI
-import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config._
-import grizzled.slf4j.Logging
-import joptsimple.OptionParser
-import joptsimple.util.KeyValuePair
import org.apache.samza.SamzaException
+import org.apache.samza.config.{Config, ConfigRewriter}
+import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.factories.PropertiesConfigFactory
-import scala.Some
+import org.apache.samza.job.ApplicationStatus.Running
import org.apache.samza.util.Util
-import scala.collection.mutable.Buffer
+import org.apache.samza.util.CommandLine
+import grizzled.slf4j.Logging
import scala.collection.JavaConversions._
object JobRunner extends Logging {
def main(args: Array[String]) {
- // Define parameters.
- var parser = new OptionParser()
- val configFactoryOpt =
- parser.accepts("config-factory", "The config factory to use to read your config file.")
- .withRequiredArg
- .ofType(classOf[java.lang.String])
- .describedAs("com.foo.bar.ClassName")
- .defaultsTo(classOf[PropertiesConfigFactory].getName)
- val configPathOpt =
- parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " +
- "If multiple files are given they are all used with later files overriding any values that appear in earlier files.")
- .withRequiredArg
- .ofType(classOf[URI])
- .describedAs("path")
- val configOverrideOpt =
- parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.")
- .withRequiredArg
- .ofType(classOf[KeyValuePair])
- .describedAs("key=value")
- var options = parser.parse(args: _*)
-
- // Verify legitimate parameters.
- if (!options.has(configPathOpt)) {
- parser.printHelpOn(System.err)
- System.exit(-1)
- }
-
- // Set up the job parameters.
- val configFactoryClass = options.valueOf(configFactoryOpt)
- val configPaths = options.valuesOf(configPathOpt)
- val configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory]
- val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap
-
- val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig)
- configs += configOverrides
-
- new JobRunner(new MapConfig(configs)).run
+ val cmdline = new CommandLine
+ val options = cmdline.parser.parse(args: _*)
+ val config = cmdline.loadConfig(options)
+ new JobRunner(config).run
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
new file mode 100644
index 0000000..f26501b
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import java.net.URI
+import joptsimple.{OptionParser, OptionSet}
+import joptsimple.util.KeyValuePair
+import org.apache.samza.config.{ConfigFactory, MapConfig}
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import scala.collection.mutable.Buffer
+import scala.collection.JavaConversions._
+
+/**
+ * Defines a basic set of command-line options for Samza tasks. Tools can use this
+ * class directly, or subclass it to add their own options.
+ */
+class CommandLine {
+ val parser = new OptionParser()
+ val configFactoryOpt =
+ parser.accepts("config-factory", "The config factory to use to read your config file.")
+ .withRequiredArg
+ .ofType(classOf[java.lang.String])
+ .describedAs("com.foo.bar.ClassName")
+ .defaultsTo(classOf[PropertiesConfigFactory].getName)
+ val configPathOpt =
+ parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " +
+ "If multiple files are given they are all used with later files overriding any values that appear in earlier files.")
+ .withRequiredArg
+ .ofType(classOf[URI])
+ .describedAs("path")
+ val configOverrideOpt =
+ parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.")
+ .withRequiredArg
+ .ofType(classOf[KeyValuePair])
+ .describedAs("key=value")
+
+ var configFactory: ConfigFactory = null
+
+ def loadConfig(options: OptionSet) = {
+ // Verify legitimate parameters.
+ if (!options.has(configPathOpt)) {
+ parser.printHelpOn(System.err)
+ System.exit(-1)
+ }
+
+ // Set up the job parameters.
+ val configFactoryClass = options.valueOf(configFactoryOpt)
+ val configPaths = options.valuesOf(configPathOpt)
+ configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory]
+ val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap
+
+ val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig)
+ configs += configOverrides
+ new MapConfig(configs)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
new file mode 100644
index 0000000..bc54f9e
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import org.junit.{Before, Test}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.junit.AssertionsForJUnit
+import org.scalatest.mock.MockitoSugar
+import scala.collection.JavaConversions._
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
+import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig}
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+
+object TestCheckpointTool {
+ var checkpointManager: CheckpointManager = null
+ var systemConsumer: SystemConsumer = null
+ var systemProducer: SystemProducer = null
+ var systemAdmin: SystemAdmin = null
+
+ class MockCheckpointManagerFactory extends CheckpointManagerFactory {
+ override def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
+ }
+
+ class MockSystemFactory extends SystemFactory {
+ override def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = systemConsumer
+ override def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = systemProducer
+ override def getAdmin(systemName: String, config: Config) = systemAdmin
+ }
+}
+
+class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
+ var config: MapConfig = null
+
+ @Before
+ def setup {
+ config = new MapConfig(Map(
+ TaskConfig.INPUT_STREAMS -> "test.foo",
+ TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
+ SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName
+ ))
+ val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
+ new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
+ new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
+ ))
+
+ TestCheckpointTool.checkpointManager = mock[CheckpointManager]
+ TestCheckpointTool.systemAdmin = mock[SystemAdmin]
+ when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo")))
+ .thenReturn(Map("foo" -> metadata))
+ when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new Partition(0)))
+ .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> "1234")))
+ when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new Partition(1)))
+ .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> "4321")))
+ }
+
+ @Test
+ def testReadLatestCheckpoint {
+ new CheckpointTool(config, null).run
+ verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new Partition(0))
+ verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new Partition(1))
+ verify(TestCheckpointTool.checkpointManager, never()).writeCheckpoint(any(), any())
+ }
+
+ @Test
+ def testOverwriteCheckpoint {
+ new CheckpointTool(config, Map(
+ new SystemStreamPartition("test", "foo", new Partition(0)) -> "42",
+ new SystemStreamPartition("test", "foo", new Partition(1)) -> "43"
+ )).run
+ verify(TestCheckpointTool.checkpointManager)
+ .writeCheckpoint(new Partition(0), new Checkpoint(Map(new SystemStream("test", "foo") -> "42")))
+ verify(TestCheckpointTool.checkpointManager)
+ .writeCheckpoint(new Partition(1), new Checkpoint(Map(new SystemStream("test", "foo") -> "43")))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index e98b50a..fed6eee 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -270,6 +270,8 @@ class KafkaCheckpointManager(
}
)
}
+
+ override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format (systemName, checkpointTopic)
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-shell/src/main/bash/checkpoint-tool.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/checkpoint-tool.sh b/samza-shell/src/main/bash/checkpoint-tool.sh
new file mode 100755
index 0000000..1a455df
--- /dev/null
+++ b/samza-shell/src/main/bash/checkpoint-tool.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+exec $(dirname $0)/run-class.sh org.apache.samza.checkpoint.CheckpointTool $@