You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2020/08/05 19:08:10 UTC
[samza] branch master updated: SAMZA-2561: Add config map to
DiagnosticsManager (#1406)
This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 2b148c9 SAMZA-2561: Add config map to DiagnosticsManager (#1406)
2b148c9 is described below
commit 2b148c924532dd6ee10acc3b169e5ccbf5953618
Author: Pawas Chhokra <pa...@gmail.com>
AuthorDate: Wed Aug 5 12:08:03 2020 -0700
SAMZA-2561: Add config map to DiagnosticsManager (#1406)
* Add config map to MetricsHeader
* Address review
* Address review
* Address review
* Address review
---
.../java/org/apache/samza/util/DiagnosticsUtil.java | 2 +-
.../apache/samza/diagnostics/DiagnosticsManager.java | 14 +++++++++++---
.../samza/diagnostics/DiagnosticsStreamMessage.java | 20 ++++++++++++++++++++
.../samza/diagnostics/TestDiagnosticsManager.java | 15 +++++++++++----
.../diagnostics/TestDiagnosticsStreamMessage.java | 7 +++++++
5 files changed, 50 insertions(+), 8 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 0b88fa0..724614b 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -151,7 +151,7 @@ public class DiagnosticsUtil {
new StorageConfig(config).getNumPersistentStores(), maxHeapSizeBytes, containerThreadPoolSize,
containerId, execEnvContainerId.orElse(""), taskClassVersion, samzaVersion, hostName,
diagnosticsSystemStream, systemProducer,
- Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled());
+ Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled(), config);
diagnosticsManagerReporterPair = Optional.of(new ImmutablePair<>(diagnosticsManager, diagnosticsReporter));
}
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index f77dab8..93ca566 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -68,6 +69,7 @@ public class DiagnosticsManager {
private final int containerThreadPoolSize;
private final Map<String, ContainerModel> containerModels;
private final boolean autosizingEnabled;
+ private final Config config;
private boolean jobParamsEmitted = false;
private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data
@@ -93,12 +95,14 @@ public class DiagnosticsManager {
String hostname,
SystemStream diagnosticSystemStream,
SystemProducer systemProducer,
- Duration terminationDuration, boolean autosizingEnabled) {
+ Duration terminationDuration,
+ boolean autosizingEnabled,
+ Config config) {
this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize,
containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer,
terminationDuration, Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled);
+ new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled, config);
}
@VisibleForTesting
@@ -118,7 +122,9 @@ public class DiagnosticsManager {
SystemStream diagnosticSystemStream,
SystemProducer systemProducer,
Duration terminationDuration,
- ScheduledExecutorService executorService, boolean autosizingEnabled) {
+ ScheduledExecutorService executorService,
+ boolean autosizingEnabled,
+ Config config) {
this.jobName = jobName;
this.jobId = jobId;
this.containerModels = containerModels;
@@ -140,6 +146,7 @@ public class DiagnosticsManager {
this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters
this.scheduler = executorService;
this.autosizingEnabled = autosizingEnabled;
+ this.config = config;
resetTime = Instant.now();
this.systemProducer.register(getClass().getSimpleName());
@@ -208,6 +215,7 @@ public class DiagnosticsManager {
diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes);
diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize);
diagnosticsStreamMessage.addAutosizingEnabled(autosizingEnabled);
+ diagnosticsStreamMessage.addConfig(config);
}
// Add stop event list to the message
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
index 15cce03..bea7ce2 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.metrics.reporter.Metrics;
import org.apache.samza.metrics.reporter.MetricsHeader;
@@ -60,6 +62,7 @@ public class DiagnosticsStreamMessage {
private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME = "containerThreadPoolSize";
private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
private static final String AUTOSIZING_ENABLED_METRIC_NAME = "autosizingEnabled";
+ private static final String CONFIG_METRIC_NAME = "config";
private final MetricsHeader metricsHeader;
private final Map<String, Map<String, Object>> metricsMessage;
@@ -156,6 +159,14 @@ public class DiagnosticsStreamMessage {
}
/**
+ * Add the job's config to the message.
+ * @param config the config to add.
+ */
+ public void addConfig(Config config) {
+ addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME, (Map<String, String>) config);
+ }
+
+ /**
* Convert this message into a {@link MetricsSnapshot}, useful for serde-deserde using {@link org.apache.samza.serializers.MetricsSnapshotSerde}.
* @return
*/
@@ -228,6 +239,14 @@ public class DiagnosticsStreamMessage {
return (Boolean) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, AUTOSIZING_ENABLED_METRIC_NAME);
}
+ /**
+ * This method gets the config of the job from the MetricsMessage.
+ * @return the config of the job.
+ */
+ public Config getConfig() {
+ return new MapConfig((Map<String, String>) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME));
+ }
+
// Helper method to get a {@link DiagnosticsStreamMessage} from a {@link MetricsSnapshot}.
// * This is typically used when deserializing messages from a diagnostics-stream.
// * @param metricsSnapshot
@@ -254,6 +273,7 @@ public class DiagnosticsStreamMessage {
diagnosticsStreamMessage.addContainerThreadPoolSize((Integer) diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME));
diagnosticsStreamMessage.addProcessorStopEvents((List<ProcessorStopEvent>) diagnosticsManagerGroupMap.get(STOP_EVENT_LIST_METRIC_NAME));
diagnosticsStreamMessage.addAutosizingEnabled((Boolean) diagnosticsManagerGroupMap.get(AUTOSIZING_ENABLED_METRIC_NAME));
+ diagnosticsStreamMessage.addConfig(new MapConfig((Map<String, String>) diagnosticsManagerGroupMap.get(CONFIG_METRIC_NAME)));
}
if (containerMetricsGroupMap != null && containerMetricsGroupMap.containsKey(EXCEPTION_LIST_METRIC_NAME)) {
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
index 6429a54..8ff58eb 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.diagnostics;
+import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -28,6 +29,8 @@ import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
@@ -59,6 +62,9 @@ public class TestDiagnosticsManager {
private int numPersistentStores = 2;
private int containerNumCores = 2;
private boolean autosizingEnabled = false;
+ private Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId,
+ "cluster-manager.container.memory.mb", "1024", "cluster-manager.container. cpu.cores", "1",
+ "cluster-manager.container.retry.count", "8"));
private Map<String, ContainerModel> containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels();
private Collection<DiagnosticsExceptionEvent> exceptionEventList = TestDiagnosticsStreamMessage.getExceptionList();
@@ -80,7 +86,7 @@ public class TestDiagnosticsManager {
this.diagnosticsManager =
new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize,
"0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream,
- mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled);
+ mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled, config);
exceptionEventList.forEach(
diagnosticsExceptionEvent -> this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent));
@@ -95,7 +101,7 @@ public class TestDiagnosticsManager {
new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
hostname, diagnosticsSystemStream, mockSystemProducer, Duration.ofSeconds(1), mockExecutorService,
- autosizingEnabled);
+ autosizingEnabled, config);
diagnosticsManager.start();
@@ -114,7 +120,7 @@ public class TestDiagnosticsManager {
new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
- autosizingEnabled);
+ autosizingEnabled, config);
diagnosticsManager.stop();
@@ -134,7 +140,7 @@ public class TestDiagnosticsManager {
new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
- autosizingEnabled);
+ autosizingEnabled, config);
diagnosticsManager.stop();
@@ -272,6 +278,7 @@ public class TestDiagnosticsManager {
Assert.assertEquals(containerNumCores, diagnosticsStreamMessage.getContainerNumCores().intValue());
Assert.assertEquals(numPersistentStores, diagnosticsStreamMessage.getNumPersistentStores().intValue());
Assert.assertEquals(autosizingEnabled, diagnosticsStreamMessage.getAutosizingEnabled());
+ Assert.assertEquals(config, diagnosticsStreamMessage.getConfig());
}
private class MockSystemProducer implements SystemProducer {
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
index cd506b2..72b1b5f 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.diagnostics;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -26,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
@@ -46,6 +49,7 @@ public class TestDiagnosticsStreamMessage {
private final String hostname = "sample host name";
private final long timestamp = System.currentTimeMillis();
private final long resetTimestamp = System.currentTimeMillis();
+ private final Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId));
private DiagnosticsStreamMessage getDiagnosticsStreamMessage() {
DiagnosticsStreamMessage diagnosticsStreamMessage =
@@ -55,6 +59,7 @@ public class TestDiagnosticsStreamMessage {
diagnosticsStreamMessage.addContainerMb(1024);
diagnosticsStreamMessage.addContainerNumCores(2);
diagnosticsStreamMessage.addNumPersistentStores(3);
+ diagnosticsStreamMessage.addConfig(config);
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
return diagnosticsStreamMessage;
@@ -107,6 +112,7 @@ public class TestDiagnosticsStreamMessage {
Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb());
Assert.assertEquals(2, (int) diagnosticsStreamMessage.getContainerNumCores());
Assert.assertEquals(3, (int) diagnosticsStreamMessage.getNumPersistentStores());
+ Assert.assertEquals(config, diagnosticsStreamMessage.getConfig());
Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents());
Assert.assertEquals(getSampleContainerModels(), diagnosticsStreamMessage.getContainerModels());
Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), getProcessorStopEventList());
@@ -139,6 +145,7 @@ public class TestDiagnosticsStreamMessage {
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents"));
+ Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("config"));
DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);