You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/21 19:27:05 UTC

samza git commit: SAMZA-1505: Fix CheckpointTool writing only one ssp per task

Repository: samza
Updated Branches:
  refs/heads/master 9fb9a7673 -> f2c14fa63


SAMZA-1505: Fix CheckpointTool writing only one ssp per task

Currently when using CheckpointTool to write checkpoints, it only writes a checkpoint of a single ssp per task. By debugging the code, looks like the flatMap() on the checkpoint of Optional tuple(taskname -> Map(ssp -> offset)) merges the results by key taskname. This patch stores the results explicitly in a list and then groupBy() on it, which fixes the problem.

Author: xiliu <xi...@xiliu-ld1.linkedin.biz>

Reviewers: Jake Maes <jm...@gmail.com>

Closes #364 from xinyuiscool/SAMZA-1505


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f2c14fa6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f2c14fa6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f2c14fa6

Branch: refs/heads/master
Commit: f2c14fa632ad201f60a21b52e9d50991954b5195
Parents: 9fb9a76
Author: Xinyu Liu <xi...@gmail.com>
Authored: Tue Nov 21 11:26:55 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Nov 21 11:26:55 2017 -0800

----------------------------------------------------------------------
 .../samza/checkpoint/CheckpointTool.scala       | 15 ++++++++-----
 .../samza/checkpoint/TestCheckpointTool.scala   | 23 +++++++++++++++++++-
 2 files changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f2c14fa6/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 0d66613..e1bb3ea 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -24,7 +24,7 @@ import java.util.regex.Pattern
 import joptsimple.OptionSet
 import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{JobConfig, ConfigRewriter, Config, StreamConfig}
+import org.apache.samza.config.{JobConfig, ConfigRewriter, Config}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.JobRunner._
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -34,6 +34,8 @@ import org.apache.samza.{Partition, SamzaException}
 import scala.collection.JavaConverters._
 import org.apache.samza.coordinator.JobModelManager
 
+import scala.collection.mutable.ListBuffer
+
 
 /**
  * Command-line tool for inspecting and manipulating the checkpoints for a job.
@@ -80,19 +82,20 @@ object CheckpointTool {
     var newOffsets: TaskNameToCheckpointMap = null
 
     def parseOffsets(propertiesFile: Config): TaskNameToCheckpointMap = {
-      val taskNameSSPPairs = propertiesFile.asScala.flatMap { case (key, value) => {
+      var checkpoints : ListBuffer[(TaskName, Map[SystemStreamPartition, String])] = ListBuffer()
+      propertiesFile.asScala.foreach { case (key, value) => {
         val matcher = SSP_REGEX.matcher(key)
         if (matcher.matches) {
           val taskname = new TaskName(matcher.group(1))
           val partition = new Partition(Integer.parseInt(matcher.group(4)))
           val ssp = new SystemStreamPartition(matcher.group(2), matcher.group(3), partition)
-          Some(taskname -> Map(ssp -> value))
+          val tuple = (taskname -> Map(ssp -> value))
+          checkpoints += tuple
         } else {
           warn("Warning: ignoring unrecognised property: %s = %s" format (key, value))
-          None
         }
-      }}.toList
-
+      }}
+      val taskNameSSPPairs = checkpoints.toList
       if(taskNameSSPPairs.isEmpty) {
         return null
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/f2c14fa6/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 283ccc4..bed013c 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -19,13 +19,16 @@
 
 package org.apache.samza.checkpoint
 
+import java.util
+
 import org.apache.samza.Partition
+import org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine
 import org.apache.samza.container.TaskName
 import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
 import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer, SystemStreamMetadata, SystemStreamPartition}
 import org.junit.{Before, Test}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -35,6 +38,8 @@ import scala.collection.JavaConverters._
 import org.apache.samza.config.JobConfig
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 
+import scala.collection.immutable.HashMap
+
 object TestCheckpointTool {
   var checkpointManager: CheckpointManager = null
   var systemConsumer: SystemConsumer = null
@@ -106,4 +111,20 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
     verify(TestCheckpointTool.checkpointManager)
       .writeCheckpoint(tn1, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "43").asJava))
   }
+
+  @Test
+  def testGrouping(): Unit = {
+    val config : java.util.Map[String, String] = new util.HashMap()
+    config.put("tasknames.Partition 0.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.0", "0000")
+    config.put("tasknames.Partition 0.systems.kafka-atc-repartitioned-requests.streams.CommunicationRequest.partitions.0", "1111")
+    config.put("tasknames.Partition 1.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.1", "2222")
+    config.put("tasknames.Partition 1.systems.kafka-atc-repartitioned-requests.streams.CommunicationRequest.partitions.1", "44444")
+    config.put("tasknames.Partition 1.systems.kafka-atc-repartitioned-requests.streams.StateChange.partitions.1", "5555")
+
+    val ccl = new CheckpointToolCommandLine
+    val result = ccl.parseOffsets(new MapConfig(config))
+
+    assert(result(new TaskName("Partition 0")).size == 2)
+    assert(result(new TaskName("Partition 1")).size == 3)
+  }
 }