You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2020/08/05 19:08:10 UTC

[samza] branch master updated: SAMZA-2561: Add config map to DiagnosticsManager (#1406)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b148c9  SAMZA-2561: Add config map to DiagnosticsManager (#1406)
2b148c9 is described below

commit 2b148c924532dd6ee10acc3b169e5ccbf5953618
Author: Pawas Chhokra <pa...@gmail.com>
AuthorDate: Wed Aug 5 12:08:03 2020 -0700

    SAMZA-2561: Add config map to DiagnosticsManager (#1406)
    
    * Add config map to MetricsHeader
    
    * Address review
    
    * Address review
    
    * Address review
    
    * Address review
---
 .../java/org/apache/samza/util/DiagnosticsUtil.java  |  2 +-
 .../apache/samza/diagnostics/DiagnosticsManager.java | 14 +++++++++++---
 .../samza/diagnostics/DiagnosticsStreamMessage.java  | 20 ++++++++++++++++++++
 .../samza/diagnostics/TestDiagnosticsManager.java    | 15 +++++++++++----
 .../diagnostics/TestDiagnosticsStreamMessage.java    |  7 +++++++
 5 files changed, 50 insertions(+), 8 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 0b88fa0..724614b 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -151,7 +151,7 @@ public class DiagnosticsUtil {
               new StorageConfig(config).getNumPersistentStores(), maxHeapSizeBytes, containerThreadPoolSize,
               containerId, execEnvContainerId.orElse(""), taskClassVersion, samzaVersion, hostName,
               diagnosticsSystemStream, systemProducer,
-              Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled());
+              Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled(), config);
 
       diagnosticsManagerReporterPair = Optional.of(new ImmutablePair<>(diagnosticsManager, diagnosticsReporter));
     }
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index f77dab8..93ca566 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -68,6 +69,7 @@ public class DiagnosticsManager {
   private final int containerThreadPoolSize;
   private final Map<String, ContainerModel> containerModels;
   private final boolean autosizingEnabled;
+  private final Config config;
   private boolean jobParamsEmitted = false;
 
   private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data
@@ -93,12 +95,14 @@ public class DiagnosticsManager {
       String hostname,
       SystemStream diagnosticSystemStream,
       SystemProducer systemProducer,
-      Duration terminationDuration, boolean autosizingEnabled) {
+      Duration terminationDuration,
+      boolean autosizingEnabled,
+      Config config) {
 
     this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize,
         containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer,
         terminationDuration, Executors.newSingleThreadScheduledExecutor(
-            new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled);
+            new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled, config);
   }
 
   @VisibleForTesting
@@ -118,7 +122,9 @@ public class DiagnosticsManager {
       SystemStream diagnosticSystemStream,
       SystemProducer systemProducer,
       Duration terminationDuration,
-      ScheduledExecutorService executorService, boolean autosizingEnabled) {
+      ScheduledExecutorService executorService,
+      boolean autosizingEnabled,
+      Config config) {
     this.jobName = jobName;
     this.jobId = jobId;
     this.containerModels = containerModels;
@@ -140,6 +146,7 @@ public class DiagnosticsManager {
     this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters
     this.scheduler = executorService;
     this.autosizingEnabled = autosizingEnabled;
+    this.config = config;
 
     resetTime = Instant.now();
     this.systemProducer.register(getClass().getSimpleName());
@@ -208,6 +215,7 @@ public class DiagnosticsManager {
           diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes);
           diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize);
           diagnosticsStreamMessage.addAutosizingEnabled(autosizingEnabled);
+          diagnosticsStreamMessage.addConfig(config);
         }
 
         // Add stop event list to the message
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
index 15cce03..bea7ce2 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.metrics.reporter.Metrics;
 import org.apache.samza.metrics.reporter.MetricsHeader;
@@ -60,6 +62,7 @@ public class DiagnosticsStreamMessage {
   private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME = "containerThreadPoolSize";
   private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
   private static final String AUTOSIZING_ENABLED_METRIC_NAME = "autosizingEnabled";
+  private static final String CONFIG_METRIC_NAME = "config";
 
   private final MetricsHeader metricsHeader;
   private final Map<String, Map<String, Object>> metricsMessage;
@@ -156,6 +159,14 @@ public class DiagnosticsStreamMessage {
   }
 
   /**
+   * Add the job's config to the message.
+   * @param config the config to add.
+   */
+  public void addConfig(Config config) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME, (Map<String, String>) config);
+  }
+
+  /**
    * Convert this message into a {@link MetricsSnapshot}, useful for serde-deserde using {@link org.apache.samza.serializers.MetricsSnapshotSerde}.
    * @return
    */
@@ -228,6 +239,14 @@ public class DiagnosticsStreamMessage {
     return (Boolean) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, AUTOSIZING_ENABLED_METRIC_NAME);
   }
 
+  /**
+   * This method gets the config of the job from the MetricsMessage.
+   * @return the config of the job.
+   */
+  public Config getConfig() {
+    return new MapConfig((Map<String, String>) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME));
+  }
+
   // Helper method to get a {@link DiagnosticsStreamMessage} from a {@link MetricsSnapshot}.
   //   * This is typically used when deserializing messages from a diagnostics-stream.
   //   * @param metricsSnapshot
@@ -254,6 +273,7 @@ public class DiagnosticsStreamMessage {
       diagnosticsStreamMessage.addContainerThreadPoolSize((Integer) diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME));
       diagnosticsStreamMessage.addProcessorStopEvents((List<ProcessorStopEvent>) diagnosticsManagerGroupMap.get(STOP_EVENT_LIST_METRIC_NAME));
       diagnosticsStreamMessage.addAutosizingEnabled((Boolean) diagnosticsManagerGroupMap.get(AUTOSIZING_ENABLED_METRIC_NAME));
+      diagnosticsStreamMessage.addConfig(new MapConfig((Map<String, String>) diagnosticsManagerGroupMap.get(CONFIG_METRIC_NAME)));
     }
 
     if (containerMetricsGroupMap != null && containerMetricsGroupMap.containsKey(EXCEPTION_LIST_METRIC_NAME)) {
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
index 6429a54..8ff58eb 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.diagnostics;
 
+import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,6 +29,8 @@ import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.metrics.reporter.MetricsSnapshot;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
@@ -59,6 +62,9 @@ public class TestDiagnosticsManager {
   private int numPersistentStores = 2;
   private int containerNumCores = 2;
   private boolean autosizingEnabled = false;
+  private Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId,
+      "cluster-manager.container.memory.mb", "1024", "cluster-manager.container. cpu.cores", "1",
+      "cluster-manager.container.retry.count", "8"));
   private Map<String, ContainerModel> containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels();
   private Collection<DiagnosticsExceptionEvent> exceptionEventList = TestDiagnosticsStreamMessage.getExceptionList();
 
@@ -80,7 +86,7 @@ public class TestDiagnosticsManager {
     this.diagnosticsManager =
         new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize,
             "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream,
-            mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled);
+            mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled, config);
 
     exceptionEventList.forEach(
       diagnosticsExceptionEvent -> this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent));
@@ -95,7 +101,7 @@ public class TestDiagnosticsManager {
         new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
             maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
             hostname, diagnosticsSystemStream, mockSystemProducer, Duration.ofSeconds(1), mockExecutorService,
-            autosizingEnabled);
+            autosizingEnabled, config);
 
     diagnosticsManager.start();
 
@@ -114,7 +120,7 @@ public class TestDiagnosticsManager {
         new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
             maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
             hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
-            autosizingEnabled);
+            autosizingEnabled, config);
 
     diagnosticsManager.stop();
 
@@ -134,7 +140,7 @@ public class TestDiagnosticsManager {
         new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
             maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
             hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
-            autosizingEnabled);
+            autosizingEnabled, config);
 
     diagnosticsManager.stop();
 
@@ -272,6 +278,7 @@ public class TestDiagnosticsManager {
     Assert.assertEquals(containerNumCores, diagnosticsStreamMessage.getContainerNumCores().intValue());
     Assert.assertEquals(numPersistentStores, diagnosticsStreamMessage.getNumPersistentStores().intValue());
     Assert.assertEquals(autosizingEnabled, diagnosticsStreamMessage.getAutosizingEnabled());
+    Assert.assertEquals(config, diagnosticsStreamMessage.getConfig());
   }
 
   private class MockSystemProducer implements SystemProducer {
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
index cd506b2..72b1b5f 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.diagnostics;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -26,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
@@ -46,6 +49,7 @@ public class TestDiagnosticsStreamMessage {
   private final String hostname = "sample host name";
   private final long timestamp = System.currentTimeMillis();
   private final long resetTimestamp = System.currentTimeMillis();
+  private final Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId));
 
   private DiagnosticsStreamMessage getDiagnosticsStreamMessage() {
     DiagnosticsStreamMessage diagnosticsStreamMessage =
@@ -55,6 +59,7 @@ public class TestDiagnosticsStreamMessage {
     diagnosticsStreamMessage.addContainerMb(1024);
     diagnosticsStreamMessage.addContainerNumCores(2);
     diagnosticsStreamMessage.addNumPersistentStores(3);
+    diagnosticsStreamMessage.addConfig(config);
 
     diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
     return diagnosticsStreamMessage;
@@ -107,6 +112,7 @@ public class TestDiagnosticsStreamMessage {
     Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb());
     Assert.assertEquals(2, (int) diagnosticsStreamMessage.getContainerNumCores());
     Assert.assertEquals(3, (int) diagnosticsStreamMessage.getNumPersistentStores());
+    Assert.assertEquals(config, diagnosticsStreamMessage.getConfig());
     Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents());
     Assert.assertEquals(getSampleContainerModels(), diagnosticsStreamMessage.getContainerModels());
     Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), getProcessorStopEventList());
@@ -139,6 +145,7 @@ public class TestDiagnosticsStreamMessage {
     Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores"));
     Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb"));
     Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents"));
+    Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("config"));
 
     DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
         DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);