You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 01:25:57 UTC
[25/36] samza git commit: SAMZA-588;
expose SamzaContainerContext through TaskContext
SAMZA-588; expose SamzaContainerContext through TaskContext
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2c9020b7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2c9020b7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2c9020b7
Branch: refs/heads/samza-sql
Commit: 2c9020b7f20d609113868c224fb8138c36521e0e
Parents: 91289f4
Author: Benjamin Fradet <be...@gmail.com>
Authored: Tue Mar 24 12:47:00 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Tue Mar 24 12:47:00 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/samza/task/TaskContext.java | 3 +++
.../apache/samza/container/SamzaContainer.scala | 3 ++-
.../apache/samza/container/TaskInstance.scala | 2 ++
.../samza/container/TestSamzaContainer.scala | 5 +++-
.../samza/container/TestTaskInstance.scala | 25 ++++++++++++++------
5 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 929409e..5b337a6 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -19,6 +19,7 @@
package org.apache.samza.task;
+import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.container.TaskName;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemStreamPartition;
@@ -38,6 +39,8 @@ public interface TaskContext {
TaskName getTaskName();
+ SamzaContainerContext getSamzaContainerContext();
+
/**
* Set the starting offset for the given {@link org.apache.samza.system.SystemStreamPartition}. Offsets
* can only be set for a {@link org.apache.samza.system.SystemStreamPartition} assigned to this task
http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 9fc3b55..5416dd6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -414,7 +414,7 @@ object SamzaContainer extends Logging {
// Increment by 1 because partition starts from 0, but we need the absolute count,
// this value is used for change log topic creation.
val maxChangeLogStreamPartitions = containerModel.getTasks.values
- .max(Ordering.by{task:TaskModel => task.getChangelogPartition.getPartitionId})
+ .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId })
.getChangelogPartition.getPartitionId + 1
val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
@@ -494,6 +494,7 @@ object SamzaContainer extends Logging {
metrics = taskInstanceMetrics,
consumerMultiplexer = consumerMultiplexer,
collector = collector,
+ containerContext = containerContext,
offsetManager = offsetManager,
storageManager = storageManager,
reporters = reporters,
http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index be0b55a..c5a5ea5 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -46,6 +46,7 @@ class TaskInstance(
metrics: TaskInstanceMetrics,
consumerMultiplexer: SystemConsumers,
collector: TaskInstanceCollector,
+ containerContext: SamzaContainerContext,
offsetManager: OffsetManager = new OffsetManager,
storageManager: TaskStorageManager = null,
reporters: Map[String, MetricsReporter] = Map(),
@@ -65,6 +66,7 @@ class TaskInstance(
null
}
def getTaskName = taskName
+ def getSamzaContainerContext = containerContext
override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
offsetManager.startingOffsets += (ssp -> offset)
http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 81742bc..cab31ca 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -121,13 +121,16 @@ class TestSamzaContainer extends AssertionsForJUnit {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
+ val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
config,
new TaskInstanceMetrics,
consumerMultiplexer,
- collector)
+ collector,
+ containerContext
+ )
val runLoop = new RunLoop(
taskInstances = Map(taskName -> taskInstance),
consumerMultiplexer = consumerMultiplexer,
http://git-wip-us.apache.org/repos/asf/samza/blob/2c9020b7/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 54b4df8..7caad28 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -69,6 +69,7 @@ class TestTaskInstance {
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
+ val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -76,6 +77,7 @@ class TestTaskInstance {
new TaskInstanceMetrics,
consumerMultiplexer,
collector,
+ containerContext,
offsetManager)
// Pretend we got a message with offset 2 and next offset 3.
val coordinator = new ReadableCoordinator(taskName)
@@ -159,6 +161,7 @@ class TestTaskInstance {
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
+ val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
val registry = new MetricsRegistryMap
val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -169,6 +172,7 @@ class TestTaskInstance {
taskMetrics,
consumerMultiplexer,
collector,
+ containerContext,
offsetManager,
exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
@@ -211,6 +215,7 @@ class TestTaskInstance {
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
+ val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
val registry = new MetricsRegistryMap
val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -221,6 +226,7 @@ class TestTaskInstance {
taskMetrics,
consumerMultiplexer,
collector,
+ containerContext,
offsetManager,
exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
@@ -261,23 +267,28 @@ class TestTaskInstance {
override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator): Unit = {}
}
+ val config = new MapConfig()
val chooser = new RoundRobinChooser()
val consumers = new SystemConsumers(chooser, consumers = Map.empty)
val producers = new SystemProducers(Map.empty, new SerdeManager())
+ val metrics = new TaskInstanceMetrics()
+ val taskName = new TaskName("Offset Reset Task 0")
val collector = new TaskInstanceCollector(producers)
+ val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
val offsetManager = new OffsetManager()
offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0")
val taskInstance = new TaskInstance(
- task = task,
- taskName = new TaskName("Offset Reset Task 0"),
- config = new MapConfig(),
- metrics = new TaskInstanceMetrics(),
- consumerMultiplexer = consumers,
- collector = collector,
- offsetManager = offsetManager,
+ task,
+ taskName,
+ config,
+ metrics,
+ consumers,
+ collector,
+ containerContext,
+ offsetManager,
systemStreamPartitions = Set(partition0, partition1) )
taskInstance.initTask