You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 01:25:57 UTC

[25/36] samza git commit: SAMZA-588; expose SamzaContainerContext through TaskContext

SAMZA-588; expose SamzaContainerContext through TaskContext


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2c9020b7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2c9020b7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2c9020b7

Branch: refs/heads/samza-sql
Commit: 2c9020b7f20d609113868c224fb8138c36521e0e
Parents: 91289f4
Author: Benjamin Fradet <be...@gmail.com>
Authored: Tue Mar 24 12:47:00 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Tue Mar 24 12:47:00 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/task/TaskContext.java |  3 +++
 .../apache/samza/container/SamzaContainer.scala |  3 ++-
 .../apache/samza/container/TaskInstance.scala   |  2 ++
 .../samza/container/TestSamzaContainer.scala    |  5 +++-
 .../samza/container/TestTaskInstance.scala      | 25 ++++++++++++++------
 5 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 929409e..5b337a6 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.task;
 
+import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemStreamPartition;
@@ -38,6 +39,8 @@ public interface TaskContext {
 
   TaskName getTaskName();
 
+  SamzaContainerContext getSamzaContainerContext();
+
   /**
    * Set the starting offset for the given {@link org.apache.samza.system.SystemStreamPartition}. Offsets
    * can only be set for a {@link org.apache.samza.system.SystemStreamPartition} assigned to this task

http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 9fc3b55..5416dd6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -414,7 +414,7 @@ object SamzaContainer extends Logging {
     // Increment by 1 because partition starts from 0, but we need the absolute count,
     // this value is used for change log topic creation.
     val maxChangeLogStreamPartitions = containerModel.getTasks.values
-            .max(Ordering.by{task:TaskModel => task.getChangelogPartition.getPartitionId})
+            .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId })
             .getChangelogPartition.getPartitionId + 1
 
     val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
@@ -494,6 +494,7 @@ object SamzaContainer extends Logging {
         metrics = taskInstanceMetrics,
         consumerMultiplexer = consumerMultiplexer,
         collector = collector,
+        containerContext = containerContext,
         offsetManager = offsetManager,
         storageManager = storageManager,
         reporters = reporters,

http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index be0b55a..c5a5ea5 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -46,6 +46,7 @@ class TaskInstance(
   metrics: TaskInstanceMetrics,
   consumerMultiplexer: SystemConsumers,
   collector: TaskInstanceCollector,
+  containerContext: SamzaContainerContext,
   offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
@@ -65,6 +66,7 @@ class TaskInstance(
       null
     }
     def getTaskName = taskName
+    def getSamzaContainerContext = containerContext
 
     override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
       offsetManager.startingOffsets += (ssp -> offset)

http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 81742bc..cab31ca 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -121,13 +121,16 @@ class TestSamzaContainer extends AssertionsForJUnit {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
       config,
       new TaskInstanceMetrics,
       consumerMultiplexer,
-      collector)
+      collector,
+      containerContext
+    )
     val runLoop = new RunLoop(
       taskInstances = Map(taskName -> taskInstance),
       consumerMultiplexer = consumerMultiplexer,

http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 54b4df8..7caad28 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -69,6 +69,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -76,6 +77,7 @@ class TestTaskInstance {
       new TaskInstanceMetrics,
       consumerMultiplexer,
       collector,
+      containerContext,
       offsetManager)
     // Pretend we got a message with offset 2 and next offset 3.
     val coordinator = new ReadableCoordinator(taskName)
@@ -159,6 +161,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -169,6 +172,7 @@ class TestTaskInstance {
       taskMetrics,
       consumerMultiplexer,
       collector,
+      containerContext,
       offsetManager,
       exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
 
@@ -211,6 +215,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -221,6 +226,7 @@ class TestTaskInstance {
       taskMetrics,
       consumerMultiplexer,
       collector,
+      containerContext,
       offsetManager,
       exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
 
@@ -261,23 +267,28 @@ class TestTaskInstance {
       override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator): Unit = {}
     }
 
+    val config = new MapConfig()
     val chooser = new RoundRobinChooser()
     val consumers = new SystemConsumers(chooser, consumers = Map.empty)
     val producers = new SystemProducers(Map.empty, new SerdeManager())
+    val metrics = new TaskInstanceMetrics()
+    val taskName = new TaskName("Offset Reset Task 0")
     val collector = new TaskInstanceCollector(producers)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
 
     val offsetManager = new OffsetManager()
 
     offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0")
 
     val taskInstance = new TaskInstance(
-      task = task,
-      taskName = new TaskName("Offset Reset Task 0"),
-      config = new MapConfig(),
-      metrics = new TaskInstanceMetrics(),
-      consumerMultiplexer = consumers,
-      collector = collector,
-      offsetManager = offsetManager,
+      task,
+      taskName,
+      config,
+      metrics,
+      consumers,
+      collector,
+      containerContext,
+      offsetManager,
       systemStreamPartitions = Set(partition0, partition1) )
 
     taskInstance.initTask