You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/21 19:27:05 UTC
samza git commit: SAMZA-1505: Fix CheckpointTool writing only one ssp
per task
Repository: samza
Updated Branches:
refs/heads/master 9fb9a7673 -> f2c14fa63
SAMZA-1505: Fix CheckpointTool writing only one ssp per task
Currently when using CheckpointTool to write checkpoints, it only writes a checkpoint of a single ssp per task. By debugging the code, looks like the flatMap() on the checkpoint of Optional tuple(taskname -> Map(ssp -> offset)) merges the results by key taskname. This patch stores the results explicitly in a list and then groupBy() on it, which fixes the problem.
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Reviewers: Jake Maes <jm...@gmail.com>
Closes #364 from xinyuiscool/SAMZA-1505
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f2c14fa6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f2c14fa6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f2c14fa6
Branch: refs/heads/master
Commit: f2c14fa632ad201f60a21b52e9d50991954b5195
Parents: 9fb9a76
Author: Xinyu Liu <xi...@gmail.com>
Authored: Tue Nov 21 11:26:55 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Nov 21 11:26:55 2017 -0800
----------------------------------------------------------------------
.../samza/checkpoint/CheckpointTool.scala | 15 ++++++++-----
.../samza/checkpoint/TestCheckpointTool.scala | 23 +++++++++++++++++++-
2 files changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f2c14fa6/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 0d66613..e1bb3ea 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -24,7 +24,7 @@ import java.util.regex.Pattern
import joptsimple.OptionSet
import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{JobConfig, ConfigRewriter, Config, StreamConfig}
+import org.apache.samza.config.{JobConfig, ConfigRewriter, Config}
import org.apache.samza.container.TaskName
import org.apache.samza.job.JobRunner._
import org.apache.samza.metrics.MetricsRegistryMap
@@ -34,6 +34,8 @@ import org.apache.samza.{Partition, SamzaException}
import scala.collection.JavaConverters._
import org.apache.samza.coordinator.JobModelManager
+import scala.collection.mutable.ListBuffer
+
/**
* Command-line tool for inspecting and manipulating the checkpoints for a job.
@@ -80,19 +82,20 @@ object CheckpointTool {
var newOffsets: TaskNameToCheckpointMap = null
def parseOffsets(propertiesFile: Config): TaskNameToCheckpointMap = {
- val taskNameSSPPairs = propertiesFile.asScala.flatMap { case (key, value) => {
+ var checkpoints : ListBuffer[(TaskName, Map[SystemStreamPartition, String])] = ListBuffer()
+ propertiesFile.asScala.foreach { case (key, value) => {
val matcher = SSP_REGEX.matcher(key)
if (matcher.matches) {
val taskname = new TaskName(matcher.group(1))
val partition = new Partition(Integer.parseInt(matcher.group(4)))
val ssp = new SystemStreamPartition(matcher.group(2), matcher.group(3), partition)
- Some(taskname -> Map(ssp -> value))
+ val tuple = (taskname -> Map(ssp -> value))
+ checkpoints += tuple
} else {
warn("Warning: ignoring unrecognised property: %s = %s" format (key, value))
- None
}
- }}.toList
-
+ }}
+ val taskNameSSPPairs = checkpoints.toList
if(taskNameSSPPairs.isEmpty) {
return null
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f2c14fa6/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 283ccc4..bed013c 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -19,13 +19,16 @@
package org.apache.samza.checkpoint
+import java.util
+
import org.apache.samza.Partition
+import org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine
import org.apache.samza.container.TaskName
import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig}
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer, SystemStreamMetadata, SystemStreamPartition}
import org.junit.{Before, Test}
import org.mockito.Matchers._
import org.mockito.Mockito._
@@ -35,6 +38,8 @@ import scala.collection.JavaConverters._
import org.apache.samza.config.JobConfig
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import scala.collection.immutable.HashMap
+
object TestCheckpointTool {
var checkpointManager: CheckpointManager = null
var systemConsumer: SystemConsumer = null
@@ -106,4 +111,20 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn1, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "43").asJava))
}
+
+ @Test
+ def testGrouping(): Unit = {
+ val config : java.util.Map[String, String] = new util.HashMap()
+ config.put("tasknames.Partition 0.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.0", "0000")
+ config.put("tasknames.Partition 0.systems.kafka-atc-repartitioned-requests.streams.CommunicationRequest.partitions.0", "1111")
+ config.put("tasknames.Partition 1.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.1", "2222")
+ config.put("tasknames.Partition 1.systems.kafka-atc-repartitioned-requests.streams.CommunicationRequest.partitions.1", "44444")
+ config.put("tasknames.Partition 1.systems.kafka-atc-repartitioned-requests.streams.StateChange.partitions.1", "5555")
+
+ val ccl = new CheckpointToolCommandLine
+ val result = ccl.parseOffsets(new MapConfig(config))
+
+ assert(result(new TaskName("Partition 0")).size == 2)
+ assert(result(new TaskName("Partition 1")).size == 3)
+ }
}