You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/09 01:07:09 UTC

[samza] 01/03: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric.

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch 1.1.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 655d60f5ed6a8b3cc6bace6b535744373736945a
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Fri Mar 8 12:21:20 2019 -0800

    Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed <task>-restore-time metric.
    
    Author: Ray Matharu <rm...@linkedin.com>
    
    Reviewers: prateekm
    
    Closes #942 from rmatharu/test-metricsfix and squashes the following commits:
    
    c5a072f4 [Ray Matharu] minor fix
    3c1e25ad [Ray Matharu] minor
    b13b485a [Ray Matharu] minor
    7e60ad8b [Ray Matharu] minor
    7634a470 [Ray Matharu] removing CSM's registerMetrics
    78ee37e1 [Ray Matharu] removing unused imports
    f207f03d [Ray Matharu] minor
    8aec4fa7 [Ray Matharu] minor
    0c121262 [Ray Matharu] Fixing metrics after moving sideInputs to CSM
    8bd3b19f [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    6b58c862 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    89be3652 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    6fe29268 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    96e3d8f3 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    40f68a61 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    497602ab [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    1a72dc48 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    36c0b339 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    12ca96bb [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    ee7daac8 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    08006871 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    916f66ae [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    2c09b081 [Ray Matharu] Rocksdb bug fix
---
 .../org/apache/samza/container/SamzaContainer.scala     |  9 +++++----
 .../apache/samza/container/SamzaContainerMetrics.scala  |  4 ++--
 .../scala/org/apache/samza/container/TaskInstance.scala |  7 -------
 .../apache/samza/storage/ContainerStorageManager.java   | 17 +++++++++--------
 .../apache/samza/system/SystemConsumersMetrics.scala    |  7 +++++--
 .../java/org/apache/samza/task/TestAsyncRunLoop.java    |  1 -
 .../org/apache/samza/container/TestSamzaContainer.scala |  1 +
 .../samza/processor/StreamProcessorTestUtils.scala      |  1 +
 8 files changed, 23 insertions(+), 24 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5df4678..ab89396 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -594,7 +594,6 @@ object SamzaContainer extends Logging {
           offsetManager = offsetManager,
           storageManager = storageManager,
           tableManager = tableManager,
-          reporters = reporters,
           systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
           exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config),
           jobModel = jobModel,
@@ -663,6 +662,7 @@ object SamzaContainer extends Logging {
     new SamzaContainer(
       config = config,
       taskInstances = taskInstances,
+      taskInstanceMetrics = taskInstanceMetrics,
       runLoop = runLoop,
       systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
@@ -700,6 +700,7 @@ object SamzaContainer extends Logging {
 class SamzaContainer(
   config: Config,
   taskInstances: Map[TaskName, TaskInstance],
+  taskInstanceMetrics: Map[TaskName, TaskInstanceMetrics],
   runLoop: Runnable,
   systemAdmins: SystemAdmins,
   consumerMultiplexer: SystemConsumers,
@@ -879,9 +880,9 @@ class SamzaContainer(
   }
 
   def startMetrics {
-    info("Registering task instances with metrics.")
-
-    taskInstances.values.foreach(_.registerMetrics)
+    info("Registering task instance metrics.")
+    reporters.values.foreach(reporter =>
+      taskInstanceMetrics.values.foreach(taskMetrics => reporter.register(taskMetrics.source, taskMetrics.registry)))
 
     info("Starting JVM metrics.")
 
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index d5cf6c6..326156b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -51,8 +51,8 @@ class SamzaContainerMetrics(
 
   val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")
 
-  def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
-    taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
+  def addStoresRestorationGauge(taskName: TaskName) {
+    taskStoreRestorationMetrics.put(taskName, newGauge("%s-restore-time" format(taskName.toString), -1L))
   }
 
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 0c8102b..fa17f24 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -52,7 +52,6 @@ class TaskInstance(
   val offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
   tableManager: TableManager = null,
-  reporters: Map[String, MetricsReporter] = Map(),
   val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
   val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler,
   jobModel: JobModel = null,
@@ -105,12 +104,6 @@ class TaskInstance(
 
   val streamsToDeleteCommittedMessages: Set[String] = config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet
 
-  def registerMetrics {
-    debug("Registering metrics for taskName: %s" format taskName)
-
-    reporters.values.foreach(_.register(metrics.source, metrics.registry))
-  }
-
   def registerOffsets {
     debug("Registering offsets for taskName: %s" format taskName)
     offsetManager.register(taskName, systemStreamPartitions)
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index ad9637d..da61a35 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -113,7 +113,8 @@ public class ContainerStorageManager {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
   private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush Thread";
-  private static final String SIDEINPUTS_METRICS_NAME = "samza-container-%s-sideinputs";
+  private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
+  // We use a prefix to differentiate the SystemConsumersMetrics for side-inputs from the ones in SamzaContainer
 
   /** Maps containing relevant per-task objects */
   private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -215,7 +216,7 @@ public class ContainerStorageManager {
     this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers);
 
     // creating task restore managers
-    this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);
+    this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock, this.samzaContainerMetrics);
 
     // create side input storage managers
     sideInputStorageManagers = createSideInputStorageManagers(clock);
@@ -229,15 +230,15 @@ public class ContainerStorageManager {
       scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata = streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
           this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(), false);
 
-      SystemConsumersMetrics systemConsumersMetrics = new SystemConsumersMetrics(
-          new MetricsRegistryMap(String.format(SIDEINPUTS_METRICS_NAME, containerModel.getId())));
+      SystemConsumersMetrics sideInputSystemConsumersMetrics = new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDEINPUTS_METRICS_PREFIX);
+      // we use the same registry as samza-container-metrics
 
       MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config,
-          systemConsumersMetrics.registry(), systemAdmins);
+          sideInputSystemConsumersMetrics.registry(), systemAdmins);
 
       sideInputSystemConsumers =
           new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager,
-              systemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+              sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
               SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
     }
 
@@ -336,11 +337,12 @@ public class ContainerStorageManager {
     return storeConsumers;
   }
 
-  private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock) {
+  private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock, SamzaContainerMetrics samzaContainerMetrics) {
     Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
     containerModel.getTasks().forEach((taskName, taskModel) -> {
         taskRestoreManagers.put(taskName,
             new TaskRestoreManager(taskModel, changelogSystemStreams, getNonSideInputStores(taskName), systemAdmins, clock));
+        samzaContainerMetrics.addStoresRestorationGauge(taskName);
       });
     return taskRestoreManagers;
   }
@@ -573,7 +575,6 @@ public class ContainerStorageManager {
     return this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
   }
 
-
   public void start() throws SamzaException {
     restoreStores();
     if (sideInputsPresent()) {
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index 43d381b..afdce08 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -19,12 +19,13 @@
 
 package org.apache.samza.system
 
-import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.Counter
 import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.ReadableMetricsRegistry
 
-class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+class SystemConsumersMetrics(val registry: ReadableMetricsRegistry = new MetricsRegistryMap,
+  val prefix: String = "") extends MetricsHelper {
   val choseNull = newCounter("chose-null")
   val choseObject = newCounter("chose-object")
   val deserializationError = newCounter("deserialization error")
@@ -55,6 +56,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
     }
   }
 
+  override def getPrefix: String = prefix
+
   def registerSystemStreamPartition(systemStreamPartition: SystemStreamPartition) {
     systemStreamMessagesChosen += systemStreamPartition -> newCounter("%s-%s-%d-messages-chosen" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId))
   }
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index acaecdb..48f8619 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -97,7 +97,6 @@ public class TestAsyncRunLoop {
         manager,
         null,
         null,
-        null,
         sspSet,
         new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()),
         null,
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index d1f60bc..e75fe54 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -280,6 +280,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     this.samzaContainer = new SamzaContainer(
       this.config,
       Map(TASK_NAME -> this.taskInstance),
+      Map(TASK_NAME -> new TaskInstanceMetrics),
       this.runLoop,
       this.systemAdmins,
       this.consumerMultiplexer,
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 9bb485a..3ff651b 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -63,6 +63,7 @@ object StreamProcessorTestUtils {
     val container = new SamzaContainer(
       config = config,
       taskInstances = Map(taskName -> taskInstance),
+      taskInstanceMetrics = Map(taskName -> new TaskInstanceMetrics),
       runLoop = mockRunloop,
       systemAdmins = adminMultiplexer,
       consumerMultiplexer = consumerMultiplexer,