You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/11/08 20:54:16 UTC

[samza] branch master updated: SAMZA-2706: Clean up specific handling of diagnostics-specific metrics reporter (#1550)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3054a86  SAMZA-2706: Clean up specific handling of diagnostics-specific metrics reporter (#1550)
3054a86 is described below

commit 3054a86ecca7fd1909fd230042c698507e4eba05
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Mon Nov 8 12:54:12 2021 -0800

    SAMZA-2706: Clean up specific handling of diagnostics-specific metrics reporter (#1550)
    
    API changes:
    1. In the YARN application master,diagnosticsreporter metrics reporter now is given a processorId of "ApplicationMaster" instead of "samza-container-ApplicationMaster".
    2. When using MetricsReporterLoader.getMetricsReporters, if diagnosticsreporter is in metrics.reporters, then diagnosticsreporter metrics reporter is always created.
    3. The diagnosticsreporter metrics reporter may still be created even if job.diagnostics.enabled is set to false. Note that Samza will only automatically create the stream for the diagnosticsreporter metrics reporter if diagnostics is enabled (since the DiagnosticsManager also uses that same stream), so if there is a case in which diagnosticsreporter metrics reporter is needed while diagnostics is disabled, then the stream for the reporter needs to be created through some other means.
---
 .../clustermanager/ContainerProcessManager.java    | 24 ++++++------------
 .../apache/samza/processor/StreamProcessor.java    | 16 +++---------
 .../apache/samza/runtime/ContainerLaunchUtil.java  | 14 +++--------
 .../org/apache/samza/util/DiagnosticsUtil.java     | 29 +++++-----------------
 .../apache/samza/util/MetricsReporterLoader.java   | 12 +--------
 .../org/apache/samza/util/TestDiagnosticsUtil.java | 17 +++----------
 6 files changed, 26 insertions(+), 86 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 143e0b3..4ef9c68 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -44,13 +44,11 @@ import org.apache.samza.metrics.ContainerProcessManagerMetrics;
 import org.apache.samza.metrics.JvmMetrics;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -105,7 +103,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
   // The ContainerManager manages control actions for both active & standby containers
   private final ContainerManager containerManager;
 
-  private final Option<DiagnosticsManager> diagnosticsManager;
+  private final Optional<DiagnosticsManager> diagnosticsManager;
 
   private final LocalityManager localityManager;
 
@@ -161,15 +159,9 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     String jobName = new JobConfig(config).getName().get();
     String jobId = new JobConfig(config).getJobId();
     Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
-    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair =
-        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, state.jobModelManager.jobModel(), METRICS_SOURCE_NAME, execEnvContainerId, config);
-
-    if (diagnosticsManagerReporterPair.isPresent()) {
-      diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
-      metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS, diagnosticsManagerReporterPair.get().getValue());
-    } else {
-      diagnosticsManager = Option.empty();
-    }
+    this.diagnosticsManager =
+        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, state.jobModelManager.jobModel(), METRICS_SOURCE_NAME,
+            execEnvContainerId, config);
 
     this.localityManager = localityManager;
     // Wire all metrics to all reporters
@@ -201,7 +193,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
     this.clusterResourceManager = resourceManager;
     this.containerManager = containerManager;
-    this.diagnosticsManager = Option.empty();
+    this.diagnosticsManager = Optional.empty();
     this.localityManager = localityManager;
     this.containerAllocator = allocator.orElseGet(
       () -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
@@ -240,7 +232,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
       metricsReporters.values().forEach(reporter -> reporter.start());
     }
 
-    if (diagnosticsManager.isDefined()) {
+    if (diagnosticsManager.isPresent()) {
       diagnosticsManager.get().start();
     }
 
@@ -295,7 +287,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
       Thread.currentThread().interrupt();
     }
 
-    if (diagnosticsManager.isDefined()) {
+    if (diagnosticsManager.isPresent()) {
       try {
         diagnosticsManager.get().stop();
       } catch (InterruptedException e) {
@@ -398,7 +390,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         onResourceCompletedWithUnknownStatus(resourceStatus, containerId, processorId, exitStatus);
     }
 
-    if (diagnosticsManager.isDefined()) {
+    if (diagnosticsManager.isPresent()) {
       diagnosticsManager.get().addProcessorStopEvent(processorId, resourceStatus.getContainerId(), hostName, exitStatus);
     }
   }
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index ea5308f..92eca0b 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -31,12 +31,10 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainerListener;
@@ -55,7 +53,6 @@ import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.task.TaskFactory;
@@ -381,17 +378,11 @@ public class StreamProcessor {
 
   @VisibleForTesting
   SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
-
-    // Creating diagnostics manager and reporter, and wiring it respectively
+    // Creating diagnostics manager and wiring it respectively
     String jobName = new JobConfig(config).getName().get();
     String jobId = new JobConfig(config).getJobId();
-    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair =
+    Optional<DiagnosticsManager> diagnosticsManager =
         DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, processorId, Optional.empty(), config);
-    Option<DiagnosticsManager> diagnosticsManager = Option.empty();
-    if (diagnosticsManagerReporterPair.isPresent()) {
-      diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
-      this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS, diagnosticsManagerReporterPair.get().getValue());
-    }
 
     // Metadata store lifecycle managed outside of the SamzaContainer.
     // All manager lifecycles are managed in the SamzaContainer including startpointManager
@@ -415,7 +406,8 @@ public class StreamProcessor {
         metricsRegistryMap, this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config, jobModel),
         Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
-        Option.apply(this.externalContextOptional.orElse(null)), null, startpointManager, diagnosticsManager);
+        Option.apply(this.externalContextOptional.orElse(null)), null, startpointManager,
+        Option.apply(diagnosticsManager.orElse(null)));
   }
 
   private static JobCoordinator createJobCoordinator(Config config, String processorId, MetricsRegistry metricsRegistry, MetadataStore metadataStore) {
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index a114e7b..89ec196 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -22,13 +22,11 @@ package org.apache.samza.runtime;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.ExecutionContainerIdManager;
@@ -48,7 +46,6 @@ import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
@@ -121,12 +118,9 @@ public class ContainerLaunchUtil {
       Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
 
       // Creating diagnostics manager and reporter, and wiring it respectively
-      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerIdOptional, config);
-      Option<DiagnosticsManager> diagnosticsManager = Option.empty();
-      if (diagnosticsManagerReporterPair.isPresent()) {
-        diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
-        metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS, diagnosticsManagerReporterPair.get().getValue());
-      }
+      Optional<DiagnosticsManager> diagnosticsManager =
+          DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerIdOptional,
+              config);
       MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
 
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
@@ -140,7 +134,7 @@ public class ContainerLaunchUtil {
           Option.apply(externalContextOptional.orElse(null)),
           localityManager,
           startpointManager,
-          diagnosticsManager);
+          Option.apply(diagnosticsManager.orElse(null)));
 
       ProcessorLifecycleListener processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory()
           .createInstance(new ProcessorContext() { }, config);
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index b1e4206..f141d92 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -21,8 +21,6 @@ package org.apache.samza.util;
 import java.io.File;
 import java.time.Duration;
 import java.util.Optional;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -36,11 +34,9 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.metrics.MetricsReporterFactory;
 import org.apache.samza.metrics.reporter.Metrics;
 import org.apache.samza.metrics.reporter.MetricsHeader;
 import org.apache.samza.metrics.reporter.MetricsSnapshot;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.runtime.LocalContainerRunner;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
@@ -95,27 +91,14 @@ public class DiagnosticsUtil {
    * if diagnostics is enabled.
    * execEnvContainerId is the ID assigned to the container by the cluster manager (e.g., YARN).
    */
-  public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildDiagnosticsManager(String jobName,
+  public static Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
       String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) {
 
     JobConfig jobConfig = new JobConfig(config);
     MetricsConfig metricsConfig = new MetricsConfig(config);
-    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = Optional.empty();
+    Optional<DiagnosticsManager> diagnosticsManagerOptional = Optional.empty();
 
     if (jobConfig.getDiagnosticsEnabled()) {
-
-      // Diagnostics MetricReporter init
-      String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
-      String diagnosticsFactoryClassName = metricsConfig.getMetricsFactoryClass(diagnosticsReporterName)
-          .orElseThrow(() -> new SamzaException(
-              String.format("Diagnostics reporter %s missing .class config", diagnosticsReporterName)));
-      MetricsReporterFactory metricsReporterFactory =
-          ReflectionUtil.getObj(diagnosticsFactoryClassName, MetricsReporterFactory.class);
-      MetricsSnapshotReporter diagnosticsReporter =
-          (MetricsSnapshotReporter) metricsReporterFactory.getMetricsReporter(diagnosticsReporterName,
-              "samza-container-" + containerId, config);
-
-      // DiagnosticsManager init
       ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
       int containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
       int containerNumCores = clusterManagerConfig.getNumCores();
@@ -125,12 +108,12 @@ public class DiagnosticsUtil {
       String samzaVersion = Util.getSamzaVersion();
       String hostName = Util.getLocalHost().getHostName();
       Optional<String> diagnosticsReporterStreamName =
-          metricsConfig.getMetricsSnapshotReporterStream(diagnosticsReporterName);
+          metricsConfig.getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS);
 
       if (!diagnosticsReporterStreamName.isPresent()) {
         throw new ConfigException(
             "Missing required config: " + String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
-                diagnosticsReporterName));
+                MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS));
       }
       SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get());
 
@@ -153,10 +136,10 @@ public class DiagnosticsUtil {
               diagnosticsSystemStream, systemProducer,
               Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled(), config);
 
-      diagnosticsManagerReporterPair = Optional.of(new ImmutablePair<>(diagnosticsManager, diagnosticsReporter));
+      diagnosticsManagerOptional = Optional.of(diagnosticsManager);
     }
 
-    return diagnosticsManagerReporterPair;
+    return diagnosticsManagerOptional;
   }
 
   public static void createDiagnosticsStream(Config config) {
diff --git a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
index 4e50efa..55baa13 100644
--- a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
+++ b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
@@ -21,7 +21,6 @@ package org.apache.samza.util;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.metrics.MetricsReporter;
@@ -38,16 +37,7 @@ public class MetricsReporterLoader {
 
   public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig metricsConfig, String containerName) {
     Map<String, MetricsReporter> metricsReporters = new HashMap<>();
-
-    String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
-
-    // Exclude creation of diagnostics-reporter, because it is created manually in SamzaContainer (to allow sharing of
-    // sysProducer between reporter and diagnosticsManager
-    List<String> metricsReporterNames = metricsConfig.getMetricReporterNames()
-        .stream()
-        .filter(reporterName -> !reporterName.equals(diagnosticsReporterName))
-        .collect(Collectors.toList());
-
+    List<String> metricsReporterNames = metricsConfig.getMetricReporterNames();
     for (String metricsReporterName : metricsReporterNames) {
       String metricsFactoryClassName = metricsConfig.getMetricsFactoryClass(metricsReporterName)
           .orElseThrow(() -> new SamzaException(
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
index d17dac1..e10c551 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
@@ -22,7 +22,6 @@ package org.apache.samza.util;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -31,8 +30,6 @@ import org.apache.samza.config.SystemConfig;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.MetricsReporterFactory;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.junit.Assert;
@@ -48,7 +45,6 @@ import static org.mockito.Mockito.*;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ReflectionUtil.class})
 public class TestDiagnosticsUtil {
-
   private static final String STREAM_NAME = "someStreamName";
   private static final String JOB_NAME = "someJob";
   private static final String JOB_ID = "someId";
@@ -58,27 +54,20 @@ public class TestDiagnosticsUtil {
   public static final String SYSTEM_FACTORY = "com.foo.system.SomeSystemFactory";
 
   @Test
-  public void testBuildDiagnosticsManagerReturnsConfiguredReporter() {
+  public void testBuildDiagnosticsManager() {
     Config config = new MapConfig(buildTestConfigs());
     JobModel mockJobModel = mock(JobModel.class);
     SystemFactory systemFactory = mock(SystemFactory.class);
     SystemProducer mockProducer = mock(SystemProducer.class);
-    MetricsReporterFactory metricsReporterFactory = mock(MetricsReporterFactory.class);
-    MetricsSnapshotReporter mockReporter = mock(MetricsSnapshotReporter.class);
-
     when(systemFactory.getProducer(anyString(), any(Config.class), any(MetricsRegistry.class), anyString())).thenReturn(mockProducer);
-    when(metricsReporterFactory.getMetricsReporter(anyString(), anyString(), any(Config.class))).thenReturn(
-        mockReporter);
     PowerMockito.mockStatic(ReflectionUtil.class);
-    when(ReflectionUtil.getObj(REPORTER_FACTORY, MetricsReporterFactory.class)).thenReturn(metricsReporterFactory);
     when(ReflectionUtil.getObj(SYSTEM_FACTORY, SystemFactory.class)).thenReturn(systemFactory);
 
-    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> managerReporterPair =
+    Optional<DiagnosticsManager> diagnosticsManager =
         DiagnosticsUtil.buildDiagnosticsManager(JOB_NAME, JOB_ID, mockJobModel, CONTAINER_ID, Optional.of(ENV_ID),
             config);
 
-    Assert.assertTrue(managerReporterPair.isPresent());
-    Assert.assertEquals(mockReporter, managerReporterPair.get().getValue());
+    Assert.assertTrue(diagnosticsManager.isPresent());
   }
 
   private Map<String, String> buildTestConfigs() {