You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/09 01:07:09 UTC
[samza] 01/03: Bugfix: Recent CSM refactor was causing some metrics
to not be emitted. Fixed -restore-time metric.
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch 1.1.0
in repository https://gitbox.apache.org/repos/asf/samza.git
commit 655d60f5ed6a8b3cc6bace6b535744373736945a
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Fri Mar 8 12:21:20 2019 -0800
Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed <task>-restore-time metric.
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: prateekm
Closes #942 from rmatharu/test-metricsfix and squashes the following commits:
c5a072f4 [Ray Matharu] minor fix
3c1e25ad [Ray Matharu] minor
b13b485a [Ray Matharu] minor
7e60ad8b [Ray Matharu] minor
7634a470 [Ray Matharu] removing CSM's registerMetrics
78ee37e1 [Ray Matharu] removing unused imports
f207f03d [Ray Matharu] minor
8aec4fa7 [Ray Matharu] minor
0c121262 [Ray Matharu] Fixing metrics after moving sideInputs to CSM
8bd3b19f [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
6b58c862 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
89be3652 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
6fe29268 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
96e3d8f3 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
40f68a61 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
497602ab [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
1a72dc48 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
36c0b339 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
12ca96bb [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
ee7daac8 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
08006871 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
916f66ae [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
2c09b081 [Ray Matharu] Rocksdb bug fix
---
.../org/apache/samza/container/SamzaContainer.scala | 9 +++++----
.../apache/samza/container/SamzaContainerMetrics.scala | 4 ++--
.../scala/org/apache/samza/container/TaskInstance.scala | 7 -------
.../apache/samza/storage/ContainerStorageManager.java | 17 +++++++++--------
.../apache/samza/system/SystemConsumersMetrics.scala | 7 +++++--
.../java/org/apache/samza/task/TestAsyncRunLoop.java | 1 -
.../org/apache/samza/container/TestSamzaContainer.scala | 1 +
.../samza/processor/StreamProcessorTestUtils.scala | 1 +
8 files changed, 23 insertions(+), 24 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5df4678..ab89396 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -594,7 +594,6 @@ object SamzaContainer extends Logging {
offsetManager = offsetManager,
storageManager = storageManager,
tableManager = tableManager,
- reporters = reporters,
systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config),
jobModel = jobModel,
@@ -663,6 +662,7 @@ object SamzaContainer extends Logging {
new SamzaContainer(
config = config,
taskInstances = taskInstances,
+ taskInstanceMetrics = taskInstanceMetrics,
runLoop = runLoop,
systemAdmins = systemAdmins,
consumerMultiplexer = consumerMultiplexer,
@@ -700,6 +700,7 @@ object SamzaContainer extends Logging {
class SamzaContainer(
config: Config,
taskInstances: Map[TaskName, TaskInstance],
+ taskInstanceMetrics: Map[TaskName, TaskInstanceMetrics],
runLoop: Runnable,
systemAdmins: SystemAdmins,
consumerMultiplexer: SystemConsumers,
@@ -879,9 +880,9 @@ class SamzaContainer(
}
def startMetrics {
- info("Registering task instances with metrics.")
-
- taskInstances.values.foreach(_.registerMetrics)
+ info("Registering task instance metrics.")
+ reporters.values.foreach(reporter =>
+ taskInstanceMetrics.values.foreach(taskMetrics => reporter.register(taskMetrics.source, taskMetrics.registry)))
info("Starting JVM metrics.")
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index d5cf6c6..326156b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -51,8 +51,8 @@ class SamzaContainerMetrics(
val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")
- def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
- taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
+ def addStoresRestorationGauge(taskName: TaskName) {
+ taskStoreRestorationMetrics.put(taskName, newGauge("%s-restore-time" format(taskName.toString), -1L))
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 0c8102b..fa17f24 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -52,7 +52,6 @@ class TaskInstance(
val offsetManager: OffsetManager = new OffsetManager,
storageManager: TaskStorageManager = null,
tableManager: TableManager = null,
- reporters: Map[String, MetricsReporter] = Map(),
val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler,
jobModel: JobModel = null,
@@ -105,12 +104,6 @@ class TaskInstance(
val streamsToDeleteCommittedMessages: Set[String] = config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet
- def registerMetrics {
- debug("Registering metrics for taskName: %s" format taskName)
-
- reporters.values.foreach(_.register(metrics.source, metrics.registry))
- }
-
def registerOffsets {
debug("Registering offsets for taskName: %s" format taskName)
offsetManager.register(taskName, systemStreamPartitions)
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index ad9637d..da61a35 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -113,7 +113,8 @@ public class ContainerStorageManager {
private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush Thread";
- private static final String SIDEINPUTS_METRICS_NAME = "samza-container-%s-sideinputs";
+ private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
+ // We use a prefix to differentiate the SystemConsumersMetrics for side-inputs from the ones in SamzaContainer
/** Maps containing relevant per-task objects */
private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -215,7 +216,7 @@ public class ContainerStorageManager {
this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers);
// creating task restore managers
- this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);
+ this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock, this.samzaContainerMetrics);
// create side input storage managers
sideInputStorageManagers = createSideInputStorageManagers(clock);
@@ -229,15 +230,15 @@ public class ContainerStorageManager {
scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata = streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(), false);
- SystemConsumersMetrics systemConsumersMetrics = new SystemConsumersMetrics(
- new MetricsRegistryMap(String.format(SIDEINPUTS_METRICS_NAME, containerModel.getId())));
+ SystemConsumersMetrics sideInputSystemConsumersMetrics = new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDEINPUTS_METRICS_PREFIX);
+ // we use the same registry as samza-container-metrics
MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config,
- systemConsumersMetrics.registry(), systemAdmins);
+ sideInputSystemConsumersMetrics.registry(), systemAdmins);
sideInputSystemConsumers =
new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager,
- systemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+ sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
}
@@ -336,11 +337,12 @@ public class ContainerStorageManager {
return storeConsumers;
}
- private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock) {
+ private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock, SamzaContainerMetrics samzaContainerMetrics) {
Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
containerModel.getTasks().forEach((taskName, taskModel) -> {
taskRestoreManagers.put(taskName,
new TaskRestoreManager(taskModel, changelogSystemStreams, getNonSideInputStores(taskName), systemAdmins, clock));
+ samzaContainerMetrics.addStoresRestorationGauge(taskName);
});
return taskRestoreManagers;
}
@@ -573,7 +575,6 @@ public class ContainerStorageManager {
return this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
}
-
public void start() throws SamzaException {
restoreStores();
if (sideInputsPresent()) {
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index 43d381b..afdce08 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -19,12 +19,13 @@
package org.apache.samza.system
-import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.metrics.Counter
import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.ReadableMetricsRegistry
-class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+class SystemConsumersMetrics(val registry: ReadableMetricsRegistry = new MetricsRegistryMap,
+ val prefix: String = "") extends MetricsHelper {
val choseNull = newCounter("chose-null")
val choseObject = newCounter("chose-object")
val deserializationError = newCounter("deserialization error")
@@ -55,6 +56,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
}
}
+ override def getPrefix: String = prefix
+
def registerSystemStreamPartition(systemStreamPartition: SystemStreamPartition) {
systemStreamMessagesChosen += systemStreamPartition -> newCounter("%s-%s-%d-messages-chosen" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId))
}
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index acaecdb..48f8619 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -97,7 +97,6 @@ public class TestAsyncRunLoop {
manager,
null,
null,
- null,
sspSet,
new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()),
null,
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index d1f60bc..e75fe54 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -280,6 +280,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
this.samzaContainer = new SamzaContainer(
this.config,
Map(TASK_NAME -> this.taskInstance),
+ Map(TASK_NAME -> new TaskInstanceMetrics),
this.runLoop,
this.systemAdmins,
this.consumerMultiplexer,
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 9bb485a..3ff651b 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -63,6 +63,7 @@ object StreamProcessorTestUtils {
val container = new SamzaContainer(
config = config,
taskInstances = Map(taskName -> taskInstance),
+ taskInstanceMetrics = Map(taskName -> new TaskInstanceMetrics),
runLoop = mockRunloop,
systemAdmins = adminMultiplexer,
consumerMultiplexer = consumerMultiplexer,