You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/17 00:05:40 UTC

samza git commit: SAMZA-568; allow tasks to override offsets in init method

Repository: samza
Updated Branches:
  refs/heads/master cad2c8286 -> 537262e28


SAMZA-568; allow tasks to override offsets in init method


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/537262e2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/537262e2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/537262e2

Branch: refs/heads/master
Commit: 537262e284d6c126ea434c1af74e66e92cf08e5e
Parents: cad2c82
Author: Ben Kirwin <be...@kirw.in>
Authored: Mon Mar 16 16:05:28 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Mar 16 16:05:28 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/task/TaskContext.java | 10 ++++
 .../apache/samza/container/TaskInstance.scala   |  4 ++
 .../samza/container/TestTaskInstance.scala      | 57 +++++++++++++++++---
 3 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/537262e2/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 6d10212..929409e 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -37,4 +37,14 @@ public interface TaskContext {
   Object getStore(String name);
 
   TaskName getTaskName();
+
+  /**
+   * Set the starting offset for the given {@link org.apache.samza.system.SystemStreamPartition}. Offsets
+   * can only be set for a {@link org.apache.samza.system.SystemStreamPartition} assigned to this task
+   * (as returned by {@link #getSystemStreamPartitions()}); trying to set the offset for any other partition
+   * will have no effect.
+   *
+   * NOTE: this feature is experimental, and the API may change in a future release.
+   */
+  void setStartingOffset(SystemStreamPartition ssp, String offset);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/537262e2/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index a583ff9..be0b55a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -65,6 +65,10 @@ class TaskInstance(
       null
     }
     def getTaskName = taskName
+
+    override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
+      offsetManager.startingOffsets += (ssp -> offset)
+    }
   }
 
   def registerMetrics {

http://git-wip-us.apache.org/repos/asf/samza/blob/537262e2/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 11eab16..54b4df8 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -40,12 +40,7 @@ import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.task.WindowableTask
+import org.apache.samza.task._
 import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.Assertions.intercept
@@ -240,4 +235,54 @@ class TestTaskInstance {
     assertEquals(2L, getCount(group, classOf[NonFatalException].getName))
     assertEquals(1L, getCount(group, classOf[FatalException].getName))
   }
+
+
+  /**
+   * Tests that the init() method of task can override the existing offset
+   * assignment.
+   */
+  @Test
+  def testManualOffsetReset {
+
+    val partition0 = new SystemStreamPartition("system", "stream", new Partition(0))
+    val partition1 = new SystemStreamPartition("system", "stream", new Partition(1))
+
+    val task = new StreamTask with InitableTask {
+
+      override def init(config: Config, context: TaskContext): Unit = {
+
+        assertTrue("Can only update offsets for assigned partition",
+          context.getSystemStreamPartitions.contains(partition1)
+        )
+
+        context.setStartingOffset(partition1, "10")
+      }
+
+      override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator): Unit = {}
+    }
+
+    val chooser = new RoundRobinChooser()
+    val consumers = new SystemConsumers(chooser, consumers = Map.empty)
+    val producers = new SystemProducers(Map.empty, new SerdeManager())
+    val collector = new TaskInstanceCollector(producers)
+
+    val offsetManager = new OffsetManager()
+
+    offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0")
+
+    val taskInstance = new TaskInstance(
+      task = task,
+      taskName = new TaskName("Offset Reset Task 0"),
+      config = new MapConfig(),
+      metrics = new TaskInstanceMetrics(),
+      consumerMultiplexer = consumers,
+      collector = collector,
+      offsetManager = offsetManager,
+      systemStreamPartitions = Set(partition0, partition1) )
+
+    taskInstance.initTask
+
+    assertEquals(Some("0"), offsetManager.getStartingOffset(partition0))
+    assertEquals(Some("10"), offsetManager.getStartingOffset(partition1))
+  }
 }