You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/05/29 17:53:16 UTC
git commit: SAMZA-267;
offset manager should not fail if task.inputs is changed
Repository: incubator-samza
Updated Branches:
refs/heads/master 95cee714e -> 7929e47c2
SAMZA-267; offset manager should not fail if task.inputs is changed
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7929e47c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7929e47c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7929e47c
Branch: refs/heads/master
Commit: 7929e47c27d480d9987357d5532b10d983b7ee32
Parents: 95cee71
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Thu May 29 08:53:02 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu May 29 08:53:02 2014 -0700
----------------------------------------------------------------------
.../apache/samza/checkpoint/OffsetManager.scala | 11 ++++++++++-
.../samza/checkpoint/TestOffsetManager.scala | 19 +++++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7929e47c/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index a8333db..9487b58 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -253,7 +253,16 @@ class OffsetManager(
checkpointManager.start
- lastProcessedOffsets ++= getPartitions.flatMap(restoreOffsetsFromCheckpoint(_))
+ lastProcessedOffsets ++= getPartitions
+ .flatMap(restoreOffsetsFromCheckpoint(_))
+ .filter {
+ case (systemStreamPartition, offset) =>
+ val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
+ if (!shouldKeep) {
+ info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition))
+ }
+ shouldKeep
+ }
} else {
debug("Skipping offset load from checkpoint manager because no manager was defined.")
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7929e47c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index e021327..552f8c2 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -187,6 +187,25 @@ class TestOffsetManager {
}
}
+ @Test
+ def testOutdatedStreamInCheckpoint {
+ val systemStream0 = new SystemStream("test-system-0", "test-stream")
+ val systemStream1 = new SystemStream("test-system-1", "test-stream")
+ val partition0 = new Partition(0)
+ val systemStreamPartition0 = new SystemStreamPartition(systemStream0, partition0)
+ val systemStreamPartition1 = new SystemStreamPartition(systemStream1, partition0)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream0.getStream, Map(partition0 -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val systemStreamMetadata = Map(systemStream0 -> testStreamMetadata)
+ val offsetSettings = Map(systemStream0 -> OffsetSetting(testStreamMetadata, OffsetType.UPCOMING, false))
+ val checkpointManager = getCheckpointManager(systemStreamPartition1)
+ val offsetManager = new OffsetManager(offsetSettings, checkpointManager)
+ offsetManager.register(systemStreamPartition0)
+ offsetManager.start
+ assertTrue(checkpointManager.isStarted)
+ assertEquals(1, checkpointManager.registered.size)
+ assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null))
+ }
+
private def getCheckpointManager(systemStreamPartition: SystemStreamPartition) = {
val checkpoint = new Checkpoint(Map(systemStreamPartition.getSystemStream -> "45"))