You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/17 00:05:40 UTC
samza git commit: SAMZA-568;
allow tasks to override offsets in init method
Repository: samza
Updated Branches:
refs/heads/master cad2c8286 -> 537262e28
SAMZA-568; allow tasks to override offsets in init method
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/537262e2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/537262e2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/537262e2
Branch: refs/heads/master
Commit: 537262e284d6c126ea434c1af74e66e92cf08e5e
Parents: cad2c82
Author: Ben Kirwin <be...@kirw.in>
Authored: Mon Mar 16 16:05:28 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Mar 16 16:05:28 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/samza/task/TaskContext.java | 10 ++++
.../apache/samza/container/TaskInstance.scala | 4 ++
.../samza/container/TestTaskInstance.scala | 57 +++++++++++++++++---
3 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/537262e2/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 6d10212..929409e 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -37,4 +37,14 @@ public interface TaskContext {
Object getStore(String name);
TaskName getTaskName();
+
+ /**
+ * Set the starting offset for the given {@link org.apache.samza.system.SystemStreamPartition}. Offsets
+ * can only be set for a {@link org.apache.samza.system.SystemStreamPartition} assigned to this task
+ * (as returned by {@link #getSystemStreamPartitions()}); trying to set the offset for any other partition
+ * will have no effect.
+ *
+ * NOTE: this feature is experimental, and the API may change in a future release.
+ */
+ void setStartingOffset(SystemStreamPartition ssp, String offset);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/537262e2/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index a583ff9..be0b55a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -65,6 +65,10 @@ class TaskInstance(
null
}
def getTaskName = taskName
+
+ override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
+ offsetManager.startingOffsets += (ssp -> offset)
+ }
}
def registerMetrics {
http://git-wip-us.apache.org/repos/asf/samza/blob/537262e2/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 11eab16..54b4df8 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -40,12 +40,7 @@ import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.task.WindowableTask
+import org.apache.samza.task._
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Assertions.intercept
@@ -240,4 +235,54 @@ class TestTaskInstance {
assertEquals(2L, getCount(group, classOf[NonFatalException].getName))
assertEquals(1L, getCount(group, classOf[FatalException].getName))
}
+
+
+ /**
+ * Tests that the init() method of task can override the existing offset
+ * assignment.
+ */
+ @Test
+ def testManualOffsetReset {
+
+ val partition0 = new SystemStreamPartition("system", "stream", new Partition(0))
+ val partition1 = new SystemStreamPartition("system", "stream", new Partition(1))
+
+ val task = new StreamTask with InitableTask {
+
+ override def init(config: Config, context: TaskContext): Unit = {
+
+ assertTrue("Can only update offsets for assigned partition",
+ context.getSystemStreamPartitions.contains(partition1)
+ )
+
+ context.setStartingOffset(partition1, "10")
+ }
+
+ override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator): Unit = {}
+ }
+
+ val chooser = new RoundRobinChooser()
+ val consumers = new SystemConsumers(chooser, consumers = Map.empty)
+ val producers = new SystemProducers(Map.empty, new SerdeManager())
+ val collector = new TaskInstanceCollector(producers)
+
+ val offsetManager = new OffsetManager()
+
+ offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0")
+
+ val taskInstance = new TaskInstance(
+ task = task,
+ taskName = new TaskName("Offset Reset Task 0"),
+ config = new MapConfig(),
+ metrics = new TaskInstanceMetrics(),
+ consumerMultiplexer = consumers,
+ collector = collector,
+ offsetManager = offsetManager,
+ systemStreamPartitions = Set(partition0, partition1) )
+
+ taskInstance.initTask
+
+ assertEquals(Some("0"), offsetManager.getStartingOffset(partition0))
+ assertEquals(Some("10"), offsetManager.getStartingOffset(partition1))
+ }
}