You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/16 18:46:14 UTC

samza git commit: SAMZA-1504: Allow user to register container-level metrics

Repository: samza
Updated Branches:
  refs/heads/master fc6c70684 -> 2e2e00ed0


SAMZA-1504: Allow user to register container-level metrics

This change allows user to register the metrics on the per-container basis.

Tested in beam runner and works as expected.

Author: xiliu <xi...@xiliu-ld1.linkedin.biz>

Reviewers: Prateek Maheshwari <pr...@apache.org>

Closes #361 from xinyuiscool/SAMZA-1504


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2e2e00ed
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2e2e00ed
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2e2e00ed

Branch: refs/heads/master
Commit: 2e2e00ed0dbab3aaa5fcaee18629932747ecba4c
Parents: fc6c706
Author: Xinyu Liu <xi...@gmail.com>
Authored: Thu Nov 16 10:46:05 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Nov 16 10:46:05 2017 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainerContext.java  |  7 ++++++-
 .../java/org/apache/samza/storage/StorageRecovery.java |  2 +-
 .../org/apache/samza/container/SamzaContainer.scala    |  2 +-
 .../samza/operators/impl/TestOperatorImplGraph.java    |  2 +-
 .../apache/samza/container/TestSamzaContainer.scala    | 13 +++++++------
 .../org/apache/samza/container/TestTaskInstance.scala  | 10 +++++-----
 .../samza/processor/StreamProcessorTestUtils.scala     |  3 ++-
 .../apache/samza/storage/kv/RocksDbKeyValueReader.java |  3 ++-
 .../test/performance/TestKeyValuePerformance.scala     |  2 +-
 9 files changed, 26 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
index 4076a51..6e13f7a 100644
--- a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
+++ b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
@@ -20,6 +20,7 @@
 package org.apache.samza.container;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -31,6 +32,7 @@ public class SamzaContainerContext {
   public final String id;
   public final Config config;
   public final Collection<TaskName> taskNames;
+  public final MetricsRegistry metricsRegistry;
 
   /**
    * An immutable context object that can passed to tasks to give them information
@@ -38,13 +40,16 @@ public class SamzaContainerContext {
    * @param id The id of the container.
    * @param config The job configuration.
    * @param taskNames The set of taskName keys for which this container is responsible.
+   * @param metricsRegistry the {@link MetricsRegistry} for the container metrics
    */
   public SamzaContainerContext(
       String id,
       Config config,
-      Collection<TaskName> taskNames) {
+      Collection<TaskName> taskNames,
+      MetricsRegistry metricsRegistry) {
     this.id = id;
     this.config = config;
     this.taskNames = Collections.unmodifiableCollection(taskNames);
+    this.metricsRegistry = metricsRegistry;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index e50f221..a47183e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -212,7 +212,7 @@ public class StorageRecovery extends CommandLine {
     for (ContainerModel containerModel : containers.values()) {
       HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
       SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getProcessorId(), jobConfig, containerModel.getTasks()
-          .keySet());
+          .keySet(), new MetricsRegistryMap());
 
       for (TaskModel taskModel : containerModel.getTasks().values()) {
         HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();

http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 6071c1f..412e9dc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -460,7 +460,7 @@ object SamzaContainer extends Logging {
       .asScala
       .map(_.getTaskName)
       .toSet
-    val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava)
+    val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava, samzaContainerMetrics.registry)
 
     // TODO not sure how we should make this config based, or not. Kind of
     // strange, since it has some dynamic directories when used with YARN.

http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 3f48cf2..1448f79 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -157,7 +157,7 @@ public class TestOperatorImplGraph {
     when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet());
     when(mockTaskContext.getJobModel()).thenReturn(jobModel);
     SamzaContainerContext containerContext =
-        new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0")));
+        new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap());
     when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext);
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class));

http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 980c2a2..8ff6e88 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -23,6 +23,7 @@ import java.util
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.{SamzaContainerStatus, Partition}
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.config.{Config, MapConfig}
@@ -164,7 +165,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -245,7 +246,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -325,7 +326,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -404,7 +405,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -480,7 +481,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -548,7 +549,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
     val mockTaskStorageManager = mock[TaskStorageManager]
 
     when(mockTaskStorageManager.init).thenAnswer(new Answer[String] {

http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index bf23d44..c45c5a1 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -73,7 +73,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -168,7 +168,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -225,7 +225,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -284,7 +284,7 @@ class TestTaskInstance {
     val metrics = new TaskInstanceMetrics()
     val taskName = new TaskName("Offset Reset Task 0")
     val collector = new TaskInstanceCollector(producers)
-    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
 
     val offsetManager = new OffsetManager()
 
@@ -319,7 +319,7 @@ class TestTaskInstance {
     val metrics = new TaskInstanceMetrics()
     val taskName = new TaskName("testing")
     val collector = new TaskInstanceCollector(producers)
-    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
     val offsetManager = new OffsetManager()
     offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
     val systemAdmins = Map("system" -> new MockSystemAdmin)

http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index f574cc3..b239119 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -22,6 +22,7 @@ import java.util.Collections
 
 import org.apache.samza.config.MapConfig
 import org.apache.samza.container.{SamzaContainerListener, RunLoop, SamzaContainer, SamzaContainerContext, SamzaContainerMetrics, TaskInstance, TaskInstanceMetrics, TaskName}
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.system.{SystemConsumer, SystemConsumers, SystemProducer, SystemProducers}
@@ -39,7 +40,7 @@ object StreamProcessorTestUtils {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Collections.singleton[TaskName](taskName))
+    val containerContext = new SamzaContainerContext("0", config, Collections.singleton[TaskName](taskName), new MetricsRegistryMap)
     val taskInstance: TaskInstance = new TaskInstance(
       streamTask,
       taskName,

http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index 8e8cc31..1fa78f8 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -27,6 +27,7 @@ import org.apache.samza.config.JavaSerializerConfig;
 import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
 import org.apache.samza.util.Util;
@@ -66,7 +67,7 @@ public class RocksDbKeyValueReader {
     ArrayList<TaskName> taskNameList = new ArrayList<TaskName>();
     taskNameList.add(new TaskName("read-rocks-db"));
     SamzaContainerContext samzaContainerContext =
-        new SamzaContainerContext("0",  config, taskNameList);
+        new SamzaContainerContext("0",  config, taskNameList, new MetricsRegistryMap());
     Options options = RocksDbOptionsHelper.options(config, samzaContainerContext);
 
     // open the db

http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index d481782..9f3c9a8 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -116,7 +116,7 @@ object TestKeyValuePerformance extends Logging {
           new TaskInstanceCollector(producerMultiplexer),
           new MetricsRegistryMap,
           null,
-          new SamzaContainerContext("0", config, taskNames)
+          new SamzaContainerContext("0", config, taskNames, new MetricsRegistryMap)
         )
 
         val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) {