You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/16 18:46:14 UTC
samza git commit: SAMZA-1504: Allow user to register container-level
metrics
Repository: samza
Updated Branches:
refs/heads/master fc6c70684 -> 2e2e00ed0
SAMZA-1504: Allow user to register container-level metrics
This change allows user to register the metrics on the per-container basis.
Tested in beam runner and works as expected.
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Reviewers: Prateek Maheshwari <pr...@apache.org>
Closes #361 from xinyuiscool/SAMZA-1504
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2e2e00ed
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2e2e00ed
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2e2e00ed
Branch: refs/heads/master
Commit: 2e2e00ed0dbab3aaa5fcaee18629932747ecba4c
Parents: fc6c706
Author: Xinyu Liu <xi...@gmail.com>
Authored: Thu Nov 16 10:46:05 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Nov 16 10:46:05 2017 -0800
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainerContext.java | 7 ++++++-
.../java/org/apache/samza/storage/StorageRecovery.java | 2 +-
.../org/apache/samza/container/SamzaContainer.scala | 2 +-
.../samza/operators/impl/TestOperatorImplGraph.java | 2 +-
.../apache/samza/container/TestSamzaContainer.scala | 13 +++++++------
.../org/apache/samza/container/TestTaskInstance.scala | 10 +++++-----
.../samza/processor/StreamProcessorTestUtils.scala | 3 ++-
.../apache/samza/storage/kv/RocksDbKeyValueReader.java | 3 ++-
.../test/performance/TestKeyValuePerformance.scala | 2 +-
9 files changed, 26 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
index 4076a51..6e13f7a 100644
--- a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
+++ b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
@@ -20,6 +20,7 @@
package org.apache.samza.container;
import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
import java.util.Collection;
import java.util.Collections;
@@ -31,6 +32,7 @@ public class SamzaContainerContext {
public final String id;
public final Config config;
public final Collection<TaskName> taskNames;
+ public final MetricsRegistry metricsRegistry;
/**
* An immutable context object that can passed to tasks to give them information
@@ -38,13 +40,16 @@ public class SamzaContainerContext {
* @param id The id of the container.
* @param config The job configuration.
* @param taskNames The set of taskName keys for which this container is responsible.
+ * @param metricsRegistry the {@link MetricsRegistry} for the container metrics
*/
public SamzaContainerContext(
String id,
Config config,
- Collection<TaskName> taskNames) {
+ Collection<TaskName> taskNames,
+ MetricsRegistry metricsRegistry) {
this.id = id;
this.config = config;
this.taskNames = Collections.unmodifiableCollection(taskNames);
+ this.metricsRegistry = metricsRegistry;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index e50f221..a47183e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -212,7 +212,7 @@ public class StorageRecovery extends CommandLine {
for (ContainerModel containerModel : containers.values()) {
HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getProcessorId(), jobConfig, containerModel.getTasks()
- .keySet());
+ .keySet(), new MetricsRegistryMap());
for (TaskModel taskModel : containerModel.getTasks().values()) {
HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();
http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 6071c1f..412e9dc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -460,7 +460,7 @@ object SamzaContainer extends Logging {
.asScala
.map(_.getTaskName)
.toSet
- val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava)
+ val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava, samzaContainerMetrics.registry)
// TODO not sure how we should make this config based, or not. Kind of
// strange, since it has some dynamic directories when used with YARN.
http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 3f48cf2..1448f79 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -157,7 +157,7 @@ public class TestOperatorImplGraph {
when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet());
when(mockTaskContext.getJobModel()).thenReturn(jobModel);
SamzaContainerContext containerContext =
- new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0")));
+ new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap());
when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext);
OperatorImplGraph opImplGraph =
new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class));
http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 980c2a2..8ff6e88 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -23,6 +23,7 @@ import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
+import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.{SamzaContainerStatus, Partition}
import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.config.{Config, MapConfig}
@@ -164,7 +165,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+ val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -245,7 +246,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+ val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -325,7 +326,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+ val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -404,7 +405,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+ val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -480,7 +481,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+ val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -548,7 +549,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+ val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName), new MetricsRegistryMap)
val mockTaskStorageManager = mock[TaskStorageManager]
when(mockTaskStorageManager.init).thenAnswer(new Answer[String] {
http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index bf23d44..c45c5a1 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -73,7 +73,7 @@ class TestTaskInstance {
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -168,7 +168,7 @@ class TestTaskInstance {
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val registry = new MetricsRegistryMap
val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -225,7 +225,7 @@ class TestTaskInstance {
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val registry = new MetricsRegistryMap
val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -284,7 +284,7 @@ class TestTaskInstance {
val metrics = new TaskInstanceMetrics()
val taskName = new TaskName("Offset Reset Task 0")
val collector = new TaskInstanceCollector(producers)
- val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val offsetManager = new OffsetManager()
@@ -319,7 +319,7 @@ class TestTaskInstance {
val metrics = new TaskInstanceMetrics()
val taskName = new TaskName("testing")
val collector = new TaskInstanceCollector(producers)
- val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val offsetManager = new OffsetManager()
offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
val systemAdmins = Map("system" -> new MockSystemAdmin)
http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index f574cc3..b239119 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -22,6 +22,7 @@ import java.util.Collections
import org.apache.samza.config.MapConfig
import org.apache.samza.container.{SamzaContainerListener, RunLoop, SamzaContainer, SamzaContainerContext, SamzaContainerMetrics, TaskInstance, TaskInstanceMetrics, TaskName}
+import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.system.chooser.RoundRobinChooser
import org.apache.samza.system.{SystemConsumer, SystemConsumers, SystemProducer, SystemProducers}
@@ -39,7 +40,7 @@ object StreamProcessorTestUtils {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Collections.singleton[TaskName](taskName))
+ val containerContext = new SamzaContainerContext("0", config, Collections.singleton[TaskName](taskName), new MetricsRegistryMap)
val taskInstance: TaskInstance = new TaskInstance(
streamTask,
taskName,
http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index 8e8cc31..1fa78f8 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -27,6 +27,7 @@ import org.apache.samza.config.JavaSerializerConfig;
import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.container.TaskName;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.util.Util;
@@ -66,7 +67,7 @@ public class RocksDbKeyValueReader {
ArrayList<TaskName> taskNameList = new ArrayList<TaskName>();
taskNameList.add(new TaskName("read-rocks-db"));
SamzaContainerContext samzaContainerContext =
- new SamzaContainerContext("0", config, taskNameList);
+ new SamzaContainerContext("0", config, taskNameList, new MetricsRegistryMap());
Options options = RocksDbOptionsHelper.options(config, samzaContainerContext);
// open the db
http://git-wip-us.apache.org/repos/asf/samza/blob/2e2e00ed/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index d481782..9f3c9a8 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -116,7 +116,7 @@ object TestKeyValuePerformance extends Logging {
new TaskInstanceCollector(producerMultiplexer),
new MetricsRegistryMap,
null,
- new SamzaContainerContext("0", config, taskNames)
+ new SamzaContainerContext("0", config, taskNames, new MetricsRegistryMap)
)
val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) {