You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/05/29 17:53:16 UTC

git commit: SAMZA-267; offset manager should not fail if task.inputs is changed

Repository: incubator-samza
Updated Branches:
  refs/heads/master 95cee714e -> 7929e47c2


SAMZA-267; offset manager should not fail if task.inputs is changed


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7929e47c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7929e47c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7929e47c

Branch: refs/heads/master
Commit: 7929e47c27d480d9987357d5532b10d983b7ee32
Parents: 95cee71
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Thu May 29 08:53:02 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu May 29 08:53:02 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/checkpoint/OffsetManager.scala  | 11 ++++++++++-
 .../samza/checkpoint/TestOffsetManager.scala     | 19 +++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7929e47c/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index a8333db..9487b58 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -253,7 +253,16 @@ class OffsetManager(
 
       checkpointManager.start
 
-      lastProcessedOffsets ++= getPartitions.flatMap(restoreOffsetsFromCheckpoint(_))
+      lastProcessedOffsets ++= getPartitions
+        .flatMap(restoreOffsetsFromCheckpoint(_))
+        .filter {
+          case (systemStreamPartition, offset) =>
+            val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
+            if (!shouldKeep) {
+              info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition))
+            }
+            shouldKeep
+        }
     } else {
       debug("Skipping offset load from checkpoint manager because no manager was defined.")
     }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7929e47c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index e021327..552f8c2 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -187,6 +187,25 @@ class TestOffsetManager {
     }
   }
 
+  @Test
+  def testOutdatedStreamInCheckpoint {
+    val systemStream0 = new SystemStream("test-system-0", "test-stream")
+    val systemStream1 = new SystemStream("test-system-1", "test-stream")
+    val partition0 = new Partition(0)
+    val systemStreamPartition0 = new SystemStreamPartition(systemStream0, partition0)
+    val systemStreamPartition1 = new SystemStreamPartition(systemStream1, partition0)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream0.getStream, Map(partition0 -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val systemStreamMetadata = Map(systemStream0 -> testStreamMetadata)
+    val offsetSettings = Map(systemStream0 -> OffsetSetting(testStreamMetadata, OffsetType.UPCOMING, false))
+    val checkpointManager = getCheckpointManager(systemStreamPartition1)
+    val offsetManager = new OffsetManager(offsetSettings, checkpointManager)
+    offsetManager.register(systemStreamPartition0)
+    offsetManager.start
+    assertTrue(checkpointManager.isStarted)
+    assertEquals(1, checkpointManager.registered.size)
+    assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null))
+  }
+
   private def getCheckpointManager(systemStreamPartition: SystemStreamPartition) = {
     val checkpoint = new Checkpoint(Map(systemStreamPartition.getSystemStream -> "45"))