You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/12/07 00:30:12 UTC
[samza] branch master updated: SAMZA-2075: Add execEnvContainerId and execEnvAttemptId for diagnostics for Kubernetes job coordinator (#1566)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 1daca89 SAMZA-2075: Add execEnvContainerId and execEnvAttemptId for diagnostics for Kubernetes job coordinator (#1566)
1daca89 is described below
commit 1daca8923396492496732175d87c0367b21355df
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Mon Dec 6 16:30:05 2021 -0800
SAMZA-2075: Add execEnvContainerId and execEnvAttemptId for diagnostics for Kubernetes job coordinator (#1566)
API changes (backwards compatible):
JSON representation of MetricsHeader (used in MetricsSnapshot) has an additional optional field called samza-epoch-id, and this field is intended to contain the deployment attempt id which is shared across all containers (job coordinator and wokrers) of a Samza job. This field is filled in by reading the SAMZA_EPOCH_ID environment variable.
---
.../clustermanager/ContainerProcessManager.java | 2 +-
.../StaticResourceJobCoordinator.java | 23 ++--
.../StaticResourceJobCoordinatorFactory.java | 12 +-
.../org/apache/samza/metrics/reporter/Metrics.java | 2 +-
.../samza/metrics/reporter/MetricsHeader.java | 42 +++++--
.../samza/metrics/reporter/MetricsSnapshot.java | 2 +-
.../metrics/reporter/MetricsSnapshotReporter.java | 12 +-
.../apache/samza/processor/StreamProcessor.java | 3 +-
.../apache/samza/runtime/ContainerLaunchUtil.java | 28 +++--
.../apache/samza/runtime/LocalContainerRunner.java | 11 +-
.../org/apache/samza/util/DiagnosticsUtil.java | 15 +--
.../samza/diagnostics/DiagnosticsManager.java | 28 +++--
.../diagnostics/DiagnosticsStreamMessage.java | 18 ++-
.../TestStaticResourceJobCoordinator.java | 27 +++--
.../samza/diagnostics/TestDiagnosticsManager.java | 125 +++++++++++----------
.../diagnostics/TestDiagnosticsStreamMessage.java | 65 ++++++-----
.../samza/metrics/reporter/TestMetricsHeader.java | 26 ++++-
.../metrics/reporter/TestMetricsSnapshot.java | 4 +-
.../reporter/TestMetricsSnapshotReporter.java | 21 ++--
.../serializers/TestMetricsSnapshotSerde.java | 81 ++++++++-----
.../serializers/TestMetricsSnapshotSerdeV2.java | 93 +++++++++++----
.../org/apache/samza/util/TestDiagnosticsUtil.java | 3 +-
22 files changed, 420 insertions(+), 223 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 4ef9c68..254e16e 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -161,7 +161,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
this.diagnosticsManager =
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, state.jobModelManager.jobModel(), METRICS_SOURCE_NAME,
- execEnvContainerId, config);
+ execEnvContainerId, Optional.empty(), config);
this.localityManager = localityManager;
// Wire all metrics to all reporters
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
index 25038e9..a14fa89 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
@@ -78,6 +78,8 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
private final MetricsRegistry metrics;
private final SystemAdmins systemAdmins;
private final String processorId;
+ private final Optional<String> executionEnvContainerId;
+ private final Optional<String> samzaEpochId;
private final Config config;
private volatile Optional<JobCoordinatorListener> jobCoordinatorListener = Optional.empty();
@@ -106,21 +108,24 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
JobInfoServingContext jobModelServingContext, CoordinatorCommunication coordinatorCommunication,
JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory,
- StreamRegexMonitorFactory streamRegexMonitorFactory, StartpointManager startpointManager,
+ StreamRegexMonitorFactory streamRegexMonitorFactory, Optional<StartpointManager> startpointManager,
ChangelogStreamManager changelogStreamManager, JobRestartSignal jobRestartSignal, MetricsRegistry metrics,
- SystemAdmins systemAdmins, Config config) {
+ SystemAdmins systemAdmins, Optional<String> executionEnvContainerId, Optional<String> samzaEpochId,
+ Config config) {
this.jobModelHelper = jobModelHelper;
this.jobModelServingContext = jobModelServingContext;
this.coordinatorCommunication = coordinatorCommunication;
this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
this.streamPartitionCountMonitorFactory = streamPartitionCountMonitorFactory;
this.streamRegexMonitorFactory = streamRegexMonitorFactory;
- this.startpointManager = Optional.ofNullable(startpointManager);
+ this.startpointManager = startpointManager;
this.changelogStreamManager = changelogStreamManager;
this.jobRestartSignal = jobRestartSignal;
this.metrics = metrics;
this.systemAdmins = systemAdmins;
this.processorId = processorId;
+ this.executionEnvContainerId = executionEnvContainerId;
+ this.samzaEpochId = samzaEpochId;
this.config = config;
}
@@ -232,9 +237,9 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
private Optional<DiagnosticsManager> diagnosticsManager(JobModel jobModel) {
JobConfig jobConfig = new JobConfig(this.config);
String jobName = jobConfig.getName().orElseThrow(() -> new ConfigException("Missing job name"));
- // TODO SAMZA-2705: construct execEnvContainerId for diagnostics
return buildDiagnosticsManager(jobName, jobConfig.getJobId(), jobModel,
- CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, this.executionEnvContainerId, this.samzaEpochId,
+ this.config);
}
/**
@@ -271,9 +276,11 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
* Wrapper around {@link DiagnosticsUtil#buildDiagnosticsManager} so it can be stubbed during testing.
*/
@VisibleForTesting
- Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
- String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) {
- return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config);
+ Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName, String jobId, JobModel jobModel,
+ String containerId, Optional<String> executionEnvContainerId, Optional<String> samzaEpochId,
+ Config config) {
+ return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, executionEnvContainerId,
+ samzaEpochId, config);
}
private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
index 718778d..b8704bf 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
@@ -18,9 +18,11 @@
*/
package org.apache.samza.coordinator.staticresource;
+import java.util.Optional;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.grouper.task.TaskAssignmentManager;
import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
@@ -44,6 +46,7 @@ import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMes
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
@@ -73,8 +76,8 @@ public class StaticResourceJobCoordinatorFactory implements JobCoordinatorFactor
JobRestartSignal jobRestartSignal =
ReflectionUtil.getObj(new JobCoordinatorConfig(config).getJobRestartSignalFactory(),
JobRestartSignalFactory.class).build(new JobRestartSignalFactoryContext(config));
- StartpointManager startpointManager =
- jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+ Optional<StartpointManager> startpointManager =
+ jobConfig.getStartpointEnabled() ? Optional.of(new StartpointManager(metadataStore)) : Optional.empty();
SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
@@ -82,10 +85,13 @@ public class StaticResourceJobCoordinatorFactory implements JobCoordinatorFactor
new StreamPartitionCountMonitorFactory(streamMetadataCache, metricsRegistry);
StreamRegexMonitorFactory streamRegexMonitorFactory =
new StreamRegexMonitorFactory(streamMetadataCache, metricsRegistry);
+ Optional<String> executionEnvContainerId =
+ Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+ Optional<String> samzaEpochId = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID));
return new StaticResourceJobCoordinator(processorId, jobModelHelper, jobModelServingContext,
coordinatorCommunication, jobCoordinatorMetadataManager, streamPartitionCountMonitorFactory,
streamRegexMonitorFactory, startpointManager, changelogStreamManager, jobRestartSignal, metricsRegistry,
- systemAdmins, config);
+ systemAdmins, executionEnvContainerId, samzaEpochId, config);
}
private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java
index 8c321c5..fa86565 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java
@@ -72,6 +72,6 @@ public class Metrics {
@Override
public String toString() {
- return "MetricsJava{" + "immutableMetrics=" + immutableMetrics + '}';
+ return "Metrics{" + "immutableMetrics=" + immutableMetrics + '}';
}
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java
index 5215038..29d7f10 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java
@@ -21,6 +21,7 @@ package org.apache.samza.metrics.reporter;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
public class MetricsHeader {
@@ -28,6 +29,7 @@ public class MetricsHeader {
private static final String JOB_ID = "job-id";
private static final String CONTAINER_NAME = "container-name";
private static final String EXEC_ENV_CONTAINER_ID = "exec-env-container-id";
+ private static final String SAMZA_EPOCH_ID = "samza-epoch-id";
private static final String SOURCE = "source";
private static final String VERSION = "version";
private static final String SAMZA_VERSION = "samza-version";
@@ -39,6 +41,10 @@ public class MetricsHeader {
private final String jobId;
private final String containerName;
private final String execEnvironmentContainerId;
+ /**
+ * This is optional for backwards compatibility. It was added added after the initial version of this class.
+ */
+ private final Optional<String> samzaEpochId;
private final String source;
private final String version;
private final String samzaVersion;
@@ -48,10 +54,18 @@ public class MetricsHeader {
public MetricsHeader(String jobName, String jobId, String containerName, String execEnvironmentContainerId,
String source, String version, String samzaVersion, String host, long time, long resetTime) {
+ this(jobName, jobId, containerName, execEnvironmentContainerId, Optional.empty(), source, version, samzaVersion,
+ host, time, resetTime);
+ }
+
+ public MetricsHeader(String jobName, String jobId, String containerName, String execEnvironmentContainerId,
+ Optional<String> samzaEpochId, String source, String version, String samzaVersion, String host, long time,
+ long resetTime) {
this.jobName = jobName;
this.jobId = jobId;
this.containerName = containerName;
this.execEnvironmentContainerId = execEnvironmentContainerId;
+ this.samzaEpochId = samzaEpochId;
this.source = source;
this.version = version;
this.samzaVersion = samzaVersion;
@@ -66,6 +80,7 @@ public class MetricsHeader {
map.put(JOB_ID, jobId);
map.put(CONTAINER_NAME, containerName);
map.put(EXEC_ENV_CONTAINER_ID, execEnvironmentContainerId);
+ this.samzaEpochId.ifPresent(epochId -> map.put(SAMZA_EPOCH_ID, epochId));
map.put(SOURCE, source);
map.put(VERSION, version);
map.put(SAMZA_VERSION, samzaVersion);
@@ -91,6 +106,14 @@ public class MetricsHeader {
return execEnvironmentContainerId;
}
+ /**
+ * Epoch id for the application, which is consistent across all components of a single deployment attempt.
+ * This may be empty, since this field was added to this class after the initial version.
+ */
+ public Optional<String> getSamzaEpochId() {
+ return samzaEpochId;
+ }
+
public String getSource() {
return source;
}
@@ -120,6 +143,8 @@ public class MetricsHeader {
map.get(JOB_ID).toString(),
map.get(CONTAINER_NAME).toString(),
map.get(EXEC_ENV_CONTAINER_ID).toString(),
+ // need to check existence for backwards compatibility with initial version of this class
+ Optional.ofNullable(map.get(SAMZA_EPOCH_ID)).map(Object::toString),
map.get(SOURCE).toString(),
map.get(VERSION).toString(),
map.get(SAMZA_VERSION).toString(),
@@ -139,22 +164,23 @@ public class MetricsHeader {
MetricsHeader that = (MetricsHeader) o;
return time == that.time && resetTime == that.resetTime && Objects.equals(jobName, that.jobName) && Objects.equals(
jobId, that.jobId) && Objects.equals(containerName, that.containerName) && Objects.equals(
- execEnvironmentContainerId, that.execEnvironmentContainerId) && Objects.equals(source, that.source)
- && Objects.equals(version, that.version) && Objects.equals(samzaVersion, that.samzaVersion) && Objects.equals(
- host, that.host);
+ execEnvironmentContainerId, that.execEnvironmentContainerId) && Objects.equals(samzaEpochId,
+ that.samzaEpochId) && Objects.equals(source, that.source) && Objects.equals(version, that.version)
+ && Objects.equals(samzaVersion, that.samzaVersion) && Objects.equals(host, that.host);
}
@Override
public int hashCode() {
- return Objects.hash(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host,
- time, resetTime);
+ return Objects.hash(jobName, jobId, containerName, execEnvironmentContainerId, samzaEpochId, source,
+ version, samzaVersion, host, time, resetTime);
}
@Override
public String toString() {
return "MetricsHeader{" + "jobName='" + jobName + '\'' + ", jobId='" + jobId + '\'' + ", containerName='"
- + containerName + '\'' + ", execEnvironmentContainerId='" + execEnvironmentContainerId + '\'' + ", source='"
- + source + '\'' + ", version='" + version + '\'' + ", samzaVersion='" + samzaVersion + '\'' + ", host='" + host
- + '\'' + ", time=" + time + ", resetTime=" + resetTime + '}';
+ + containerName + '\'' + ", execEnvironmentContainerId='" + execEnvironmentContainerId + '\''
+ + ", samzaEpochId=" + samzaEpochId + ", source='" + source + '\'' + ", version='" + version + '\''
+ + ", samzaVersion='" + samzaVersion + '\'' + ", host='" + host + '\'' + ", time=" + time + ", resetTime="
+ + resetTime + '}';
}
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java
index 00b9deb..0454f79 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java
@@ -75,6 +75,6 @@ public class MetricsSnapshot {
@Override
public String toString() {
- return "MetricsSnapshotJava{" + "metricsHeader=" + metricsHeader + ", metrics=" + metrics + '}';
+ return "MetricsSnapshot{" + "metricsHeader=" + metricsHeader + ", metrics=" + metrics + '}';
}
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java
index 648280a..c57c1cd 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java
@@ -33,6 +33,7 @@ import java.util.regex.Pattern;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistryWithSource;
@@ -75,7 +76,8 @@ public class MetricsSnapshotReporter implements MetricsReporter, Runnable {
private final Optional<Pattern> blacklist;
private final Clock clock;
- private final String execEnvironmentContainerId;
+ private final String executionEnvContainerId;
+ private final String samzaEpochId;
private final ScheduledExecutorService executor;
private final long resetTime;
private final List<MetricsRegistryWithSource> registries = new ArrayList<>();
@@ -97,8 +99,9 @@ public class MetricsSnapshotReporter implements MetricsReporter, Runnable {
this.blacklist = blacklist;
this.clock = clock;
- this.execEnvironmentContainerId =
+ this.executionEnvContainerId =
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).orElse("");
+ this.samzaEpochId = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID)).orElse("");
this.executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build());
this.resetTime = this.clock.currentTimeMillis();
@@ -193,8 +196,9 @@ public class MetricsSnapshotReporter implements MetricsReporter, Runnable {
// publish to Kafka only if the metricsMsg carries any metrics
if (!metricsMsg.isEmpty()) {
MetricsHeader header =
- new MetricsHeader(this.jobName, this.jobId, this.containerName, this.execEnvironmentContainerId, source,
- this.version, this.samzaVersion, this.host, this.clock.currentTimeMillis(), this.resetTime);
+ new MetricsHeader(this.jobName, this.jobId, this.containerName, this.executionEnvContainerId,
+ Optional.of(this.samzaEpochId), source, this.version, this.samzaVersion, this.host,
+ this.clock.currentTimeMillis(), this.resetTime);
Metrics metrics = new Metrics(metricsMsg);
LOG.debug("Flushing metrics for {} to {} with header and map: header={}, map={}.", source, out,
header.getAsMap(), metrics.getAsMap());
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 92eca0b..2994606 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -382,7 +382,8 @@ public class StreamProcessor {
String jobName = new JobConfig(config).getName().get();
String jobId = new JobConfig(config).getJobId();
Optional<DiagnosticsManager> diagnosticsManager =
- DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, processorId, Optional.empty(), config);
+ DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, processorId, Optional.empty(),
+ Optional.empty(), config);
// Metadata store lifecycle managed outside of the SamzaContainer.
// All manager lifecycles are managed in the SamzaContainer including startpointManager
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 89ec196..e9f4311 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -41,6 +41,7 @@ import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
import org.apache.samza.diagnostics.DiagnosticsManager;
+import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.logging.LoggingContextHolder;
import org.apache.samza.metadatastore.MetadataStore;
@@ -67,9 +68,12 @@ public class ContainerLaunchUtil {
* Any change here needs to take Beam into account.
*/
public static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, JobModel jobModel) {
- Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+ Optional<String> executionEnvContainerId =
+ Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+ Optional<String> samzaEpochId = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID));
JobConfig jobConfig = new JobConfig(jobModel.getConfig());
- ContainerLaunchUtil.run(appDesc, jobConfig.getName().get(), jobConfig.getJobId(), containerId, execEnvContainerId, jobModel);
+ ContainerLaunchUtil.run(appDesc, jobConfig.getName().get(), jobConfig.getJobId(), containerId,
+ executionEnvContainerId, samzaEpochId, jobModel);
}
/**
@@ -77,7 +81,11 @@ public class ContainerLaunchUtil {
*/
public static void run(
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
- String jobName, String jobId, String containerId, Optional<String> execEnvContainerId,
+ String jobName,
+ String jobId,
+ String containerId,
+ Optional<String> executionEnvContainerId,
+ Optional<String> samzaEpochId,
JobModel jobModel) {
Config config = jobModel.getConfig();
@@ -87,8 +95,9 @@ public class ContainerLaunchUtil {
MDC.put("jobId", jobId);
LoggingContextHolder.INSTANCE.setConfig(jobModel.getConfig());
- DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, execEnvContainerId, config);
- run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel, config, buildExternalContext(config));
+ DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, executionEnvContainerId, config);
+ run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config,
+ buildExternalContext(config));
System.exit(0);
}
@@ -98,7 +107,8 @@ public class ContainerLaunchUtil {
String jobName,
String jobId,
String containerId,
- Optional<String> execEnvContainerIdOptional,
+ Optional<String> executionEnvContainerId,
+ Optional<String> samzaEpochId,
JobModel jobModel,
Config config,
Optional<ExternalContext> externalContextOptional) {
@@ -119,8 +129,8 @@ public class ContainerLaunchUtil {
// Creating diagnostics manager and reporter, and wiring it respectively
Optional<DiagnosticsManager> diagnosticsManager =
- DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerIdOptional,
- config);
+ DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, executionEnvContainerId,
+ samzaEpochId, config);
MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
SamzaContainer container = SamzaContainer$.MODULE$.apply(
@@ -149,7 +159,7 @@ public class ContainerLaunchUtil {
}
if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
- execEnvContainerIdOptional.ifPresent(execEnvContainerId -> {
+ executionEnvContainerId.ifPresent(execEnvContainerId -> {
ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionEnvContainerIdMapping.TYPE));
executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(containerId, execEnvContainerId);
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 087d8bd..1c82000 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -30,6 +30,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.logging.LoggingContextHolder;
import org.apache.samza.util.SamzaUncaughtExceptionHandler;
@@ -56,7 +57,9 @@ public class LocalContainerRunner {
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
- Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+ Optional<String> executionEnvContainerId =
+ Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+ Optional<String> samzaEpochId = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID));
int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
@@ -75,7 +78,7 @@ public class LocalContainerRunner {
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
- ContainerLaunchUtil.run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel);
+ ContainerLaunchUtil.run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId,
+ jobModel);
}
-
-}
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index f141d92..7ef29ac 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -87,12 +87,13 @@ public class DiagnosticsUtil {
}
/**
- * Create a pair of DiagnosticsManager and Reporter for the given jobName, jobId, containerId, and execEnvContainerId,
- * if diagnostics is enabled.
- * execEnvContainerId is the ID assigned to the container by the cluster manager (e.g., YARN).
+ * Create a {@link DiagnosticsManager} for the given jobName, jobId, containerId, and execEnvContainerId, if
+ * diagnostics is enabled.
+ * @param executionEnvContainerId ID assigned to the container by the cluster manager (e.g. YARN)
+ * @param samzaEpochId ID assigned to the job deployment attempt by the cluster manager
*/
- public static Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
- String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) {
+ public static Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName, String jobId, JobModel jobModel,
+ String containerId, Optional<String> executionEnvContainerId, Optional<String> samzaEpochId, Config config) {
JobConfig jobConfig = new JobConfig(config);
MetricsConfig metricsConfig = new MetricsConfig(config);
@@ -132,8 +133,8 @@ public class DiagnosticsUtil {
DiagnosticsManager diagnosticsManager =
new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), containerMemoryMb, containerNumCores,
new StorageConfig(config).getNumPersistentStores(), maxHeapSizeBytes, containerThreadPoolSize,
- containerId, execEnvContainerId.orElse(""), taskClassVersion, samzaVersion, hostName,
- diagnosticsSystemStream, systemProducer,
+ containerId, executionEnvContainerId.orElse(""), samzaEpochId.orElse(""), taskClassVersion,
+ samzaVersion, hostName, diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled(), config);
diagnosticsManagerOptional = Optional.of(diagnosticsManager);
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index 4389fcc..f420a93 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -25,6 +25,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -35,7 +36,9 @@ import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Clock;
import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +59,7 @@ public class DiagnosticsManager {
private final String jobId;
private final String containerId;
private final String executionEnvContainerId;
+ private final String samzaEpochId;
private final String taskClassVersion;
private final String samzaVersion;
private final String hostname;
@@ -70,6 +74,7 @@ public class DiagnosticsManager {
private final Map<String, ContainerModel> containerModels;
private final boolean autosizingEnabled;
private final Config config;
+ private final Clock clock;
private boolean jobParamsEmitted = false;
private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data
@@ -90,6 +95,7 @@ public class DiagnosticsManager {
int containerThreadPoolSize,
String containerId,
String executionEnvContainerId,
+ String samzaEpochId,
String taskClassVersion,
String samzaVersion,
String hostname,
@@ -99,10 +105,12 @@ public class DiagnosticsManager {
boolean autosizingEnabled,
Config config) {
- this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize,
- containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer,
- terminationDuration, Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled, config);
+ this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes,
+ containerThreadPoolSize, containerId, executionEnvContainerId, samzaEpochId, taskClassVersion,
+ samzaVersion, hostname, diagnosticSystemStream, systemProducer, terminationDuration,
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled,
+ config, SystemClock.instance());
}
@VisibleForTesting
@@ -116,6 +124,7 @@ public class DiagnosticsManager {
int containerThreadPoolSize,
String containerId,
String executionEnvContainerId,
+ String samzaEpochId,
String taskClassVersion,
String samzaVersion,
String hostname,
@@ -124,7 +133,8 @@ public class DiagnosticsManager {
Duration terminationDuration,
ScheduledExecutorService executorService,
boolean autosizingEnabled,
- Config config) {
+ Config config,
+ Clock clock) {
this.jobName = jobName;
this.jobId = jobId;
this.containerModels = containerModels;
@@ -135,6 +145,7 @@ public class DiagnosticsManager {
this.containerThreadPoolSize = containerThreadPoolSize;
this.containerId = containerId;
this.executionEnvContainerId = executionEnvContainerId;
+ this.samzaEpochId = samzaEpochId;
this.taskClassVersion = taskClassVersion;
this.samzaVersion = samzaVersion;
this.hostname = hostname;
@@ -147,8 +158,9 @@ public class DiagnosticsManager {
this.scheduler = executorService;
this.autosizingEnabled = autosizingEnabled;
this.config = config;
+ this.clock = clock;
- resetTime = Instant.now();
+ this.resetTime = Instant.ofEpochMilli(this.clock.currentTimeMillis());
this.systemProducer.register(getClass().getSimpleName());
try {
@@ -199,13 +211,13 @@ public class DiagnosticsManager {
}
private class DiagnosticsStreamPublisher implements Runnable {
-
@Override
public void run() {
try {
DiagnosticsStreamMessage diagnosticsStreamMessage =
new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId,
- taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli());
+ Optional.of(samzaEpochId), taskClassVersion, samzaVersion, hostname,
+ clock.currentTimeMillis(), resetTime.toEpochMilli());
// Add job-related params to the message (if not already published)
if (!jobParamsEmitted) {
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
index ffe056e..9cc8d72 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.samza.config.Config;
@@ -68,13 +69,10 @@ public class DiagnosticsStreamMessage {
private final Map<String, Map<String, Object>> metricsMessage;
public DiagnosticsStreamMessage(String jobName, String jobId, String containerName, String executionEnvContainerId,
- String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp) {
-
- // Create the metricHeader
- metricsHeader =
- new MetricsHeader(jobName, jobId, containerName, executionEnvContainerId, DiagnosticsManager.class.getName(),
- taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp);
-
+ Optional<String> samzaEpochId, String taskClassVersion, String samzaVersion, String hostname,
+ long timestamp, long resetTimestamp) {
+ this.metricsHeader = new MetricsHeader(jobName, jobId, containerName, executionEnvContainerId, samzaEpochId,
+ DiagnosticsManager.class.getName(), taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp);
this.metricsMessage = new HashMap<>();
}
@@ -254,9 +252,9 @@ public class DiagnosticsStreamMessage {
DiagnosticsStreamMessage diagnosticsStreamMessage =
new DiagnosticsStreamMessage(metricsSnapshot.getHeader().getJobName(), metricsSnapshot.getHeader().getJobId(),
metricsSnapshot.getHeader().getContainerName(), metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
- metricsSnapshot.getHeader().getVersion(), metricsSnapshot.getHeader().getSamzaVersion(),
- metricsSnapshot.getHeader().getHost(), metricsSnapshot.getHeader().getTime(),
- metricsSnapshot.getHeader().getResetTime());
+ metricsSnapshot.getHeader().getSamzaEpochId(), metricsSnapshot.getHeader().getVersion(),
+ metricsSnapshot.getHeader().getSamzaVersion(), metricsSnapshot.getHeader().getHost(),
+ metricsSnapshot.getHeader().getTime(), metricsSnapshot.getHeader().getResetTime());
Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap();
Map<String, Object> diagnosticsManagerGroupMap = metricsMap.get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER);
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
index 9386883..b55b490 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
@@ -87,6 +87,8 @@ public class TestStaticResourceJobCoordinator {
private static final String JOB_NAME = "my-samza-job";
private static final String JOB_ID = "123";
private static final String PROCESSOR_ID = "samza-job-coordinator";
+ private static final Optional<String> EXECUTION_ENV_CONTAINER_ID = Optional.of("execution_container_123");
+ private static final Optional<String> SAMZA_EPOCH_ID = Optional.of("epoch_123");
private static final SystemStream SYSTEM_STREAM = new SystemStream("system", "stream");
private static final TaskName TASK_NAME = new TaskName("Partition " + 0);
private static final Map<String, ContainerModel> CONTAINERS = ImmutableMap.of("0", new ContainerModel("0",
@@ -136,8 +138,8 @@ public class TestStaticResourceJobCoordinator {
this.staticResourceJobCoordinator =
spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext,
this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory,
- this.streamRegexMonitorFactory, this.startpointManager, this.changelogStreamManager, this.jobRestartSignal,
- this.metrics, this.systemAdmins, this.config));
+ this.streamRegexMonitorFactory, Optional.of(this.startpointManager), this.changelogStreamManager,
+ this.jobRestartSignal, this.metrics, this.systemAdmins, EXECUTION_ENV_CONTAINER_ID, SAMZA_EPOCH_ID, this.config));
this.staticResourceJobCoordinator.setListener(this.jobCoordinatorListener);
doNothing().when(this.staticResourceJobCoordinator).doSetLoggingContextConfig(any());
}
@@ -224,8 +226,8 @@ public class TestStaticResourceJobCoordinator {
this.staticResourceJobCoordinator =
spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext,
this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory,
- this.streamRegexMonitorFactory, null, this.changelogStreamManager, this.jobRestartSignal, this.metrics,
- this.systemAdmins, this.config));
+ this.streamRegexMonitorFactory, Optional.empty(), this.changelogStreamManager, this.jobRestartSignal,
+ this.metrics, this.systemAdmins, Optional.empty(), Optional.empty(), this.config));
Config jobModelConfig = mock(Config.class);
JobModel jobModel = setupJobModel(jobModelConfig);
StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig);
@@ -233,13 +235,15 @@ public class TestStaticResourceJobCoordinator {
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
- .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
- CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+ .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel, CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME,
+ Optional.empty(), Optional.empty(), this.config);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verify(this.systemAdmins).start();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.staticResourceJobCoordinator).buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), Optional.empty(), this.config);
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor, null,
newMetadata, null);
verifyZeroInteractions(this.jobCoordinatorListener, this.startpointManager);
@@ -275,8 +279,8 @@ public class TestStaticResourceJobCoordinator {
this.staticResourceJobCoordinator =
spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext,
this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory,
- this.streamRegexMonitorFactory, null, this.changelogStreamManager, this.jobRestartSignal, this.metrics,
- this.systemAdmins, this.config));
+ this.streamRegexMonitorFactory, Optional.empty(), this.changelogStreamManager, this.jobRestartSignal,
+ this.metrics, this.systemAdmins, Optional.empty(), Optional.empty(), this.config));
Config jobModelConfig = mock(Config.class);
JobModel jobModel = setupJobModel(jobModelConfig);
@@ -285,8 +289,8 @@ public class TestStaticResourceJobCoordinator {
setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
- .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
- CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+ .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel, CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME,
+ Optional.empty(), Optional.empty(), this.config);
metadataResourceUtil(jobModel);
// call start in order to set up monitors
this.staticResourceJobCoordinator.start();
@@ -424,7 +428,8 @@ public class TestStaticResourceJobCoordinator {
private void setUpDiagnosticsManager(JobModel expectedJobModel) {
doReturn(Optional.of(this.diagnosticsManager)).when(this.staticResourceJobCoordinator)
.buildDiagnosticsManager(JOB_NAME, JOB_ID, expectedJobModel,
- CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, EXECUTION_ENV_CONTAINER_ID, SAMZA_EPOCH_ID,
+ this.config);
}
private void verifyStartLifecycle() {
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
index 8ff58eb..5ef6129 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -26,17 +26,20 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.reporter.MetricsHeader;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Clock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -48,29 +51,34 @@ public class TestDiagnosticsManager {
private DiagnosticsManager diagnosticsManager;
private MockSystemProducer mockSystemProducer;
private ScheduledExecutorService mockExecutorService;
- private SystemStream diagnosticsSystemStream = new SystemStream("kafka", "test stream");
-
- private String jobName = "Testjob";
- private String jobId = "test job id";
- private String executionEnvContainerId = "exec container id";
- private String taskClassVersion = "0.0.1";
- private String samzaVersion = "1.3.0";
- private String hostname = "sample host name";
- private int containerMb = 1024;
- private int containerThreadPoolSize = 2;
- private long maxHeapSize = 900;
- private int numPersistentStores = 2;
- private int containerNumCores = 2;
- private boolean autosizingEnabled = false;
- private Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId,
+ private final SystemStream diagnosticsSystemStream = new SystemStream("kafka", "test stream");
+
+ private static final String JOB_NAME = "Testjob";
+ private static final String JOB_ID = "test job id";
+ private static final String EXECUTION_ENV_CONTAINER_ID = "exec container id";
+ private static final String SAMZA_EPOCH_ID = "epoch-123";
+ private static final String TASK_CLASS_VERSION = "0.0.1";
+ private static final String SAMZA_VERSION = "1.3.0";
+ private static final String HOSTNAME = "sample host name";
+ private static final int CONTAINER_MB = 1024;
+ private static final int CONTAINER_THREAD_POOL_SIZE = 2;
+ private static final long MAX_HEAP_SIZE = 900;
+ private static final int NUM_PERSISTENT_STORES = 2;
+ private static final int CONTAINER_NUM_CORES = 2;
+ private static final boolean AUTOSIZING_ENABLED = false;
+ private static final long RESET_TIME = 10;
+ private static final long FIRST_SEND_TIME = 20;
+ private static final long SECOND_SEND_TIME = 30;
+ private final Config config = new MapConfig(ImmutableMap.of("job.name", JOB_NAME, "job.id", JOB_ID,
"cluster-manager.container.memory.mb", "1024", "cluster-manager.container. cpu.cores", "1",
"cluster-manager.container.retry.count", "8"));
- private Map<String, ContainerModel> containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels();
- private Collection<DiagnosticsExceptionEvent> exceptionEventList = TestDiagnosticsStreamMessage.getExceptionList();
+ private final Map<String, ContainerModel> containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels();
+ private final Collection<DiagnosticsExceptionEvent> exceptionEventList =
+ TestDiagnosticsStreamMessage.getExceptionList();
+ private Clock clock;
@Before
public void setup() {
-
// Mocked system producer for publishing to diagnostics stream
mockSystemProducer = new MockSystemProducer();
@@ -83,25 +91,30 @@ public class TestDiagnosticsManager {
.mock(ScheduledFuture.class);
});
+ this.clock = Mockito.mock(Clock.class);
+ // first call is for getting reset time, then other calls are for timestamp when message is being published
+ Mockito.when(this.clock.currentTimeMillis()).thenReturn(RESET_TIME, FIRST_SEND_TIME, SECOND_SEND_TIME);
+
this.diagnosticsManager =
- new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize,
- "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream,
- mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled, config);
+ new DiagnosticsManager(JOB_NAME, JOB_ID, containerModels, CONTAINER_MB, CONTAINER_NUM_CORES,
+ NUM_PERSISTENT_STORES, MAX_HEAP_SIZE, CONTAINER_THREAD_POOL_SIZE, "0", EXECUTION_ENV_CONTAINER_ID,
+ SAMZA_EPOCH_ID, TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, diagnosticsSystemStream,
+ mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, AUTOSIZING_ENABLED, config, this.clock);
exceptionEventList.forEach(
diagnosticsExceptionEvent -> this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent));
- this.diagnosticsManager.addProcessorStopEvent("0", executionEnvContainerId, hostname, 101);
+ this.diagnosticsManager.addProcessorStopEvent("0", EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 101);
}
@Test
public void testDiagnosticsManagerStart() {
SystemProducer mockSystemProducer = Mockito.mock(SystemProducer.class);
DiagnosticsManager diagnosticsManager =
- new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
- maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
- hostname, diagnosticsSystemStream, mockSystemProducer, Duration.ofSeconds(1), mockExecutorService,
- autosizingEnabled, config);
+ new DiagnosticsManager(JOB_NAME, JOB_ID, containerModels, CONTAINER_MB, CONTAINER_NUM_CORES,
+ NUM_PERSISTENT_STORES, MAX_HEAP_SIZE, CONTAINER_THREAD_POOL_SIZE, "0", EXECUTION_ENV_CONTAINER_ID,
+ SAMZA_EPOCH_ID, TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, diagnosticsSystemStream,
+ mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, AUTOSIZING_ENABLED, config, this.clock);
diagnosticsManager.start();
@@ -117,10 +130,10 @@ public class TestDiagnosticsManager {
Mockito.when(mockExecutorService.isTerminated()).thenReturn(true);
Duration terminationDuration = Duration.ofSeconds(1);
DiagnosticsManager diagnosticsManager =
- new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
- maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
- hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
- autosizingEnabled, config);
+ new DiagnosticsManager(JOB_NAME, JOB_ID, containerModels, CONTAINER_MB, CONTAINER_NUM_CORES,
+ NUM_PERSISTENT_STORES, MAX_HEAP_SIZE, CONTAINER_THREAD_POOL_SIZE, "0", EXECUTION_ENV_CONTAINER_ID,
+ SAMZA_EPOCH_ID, TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, diagnosticsSystemStream,
+ mockSystemProducer, terminationDuration, mockExecutorService, AUTOSIZING_ENABLED, config, this.clock);
diagnosticsManager.stop();
@@ -137,10 +150,10 @@ public class TestDiagnosticsManager {
Mockito.when(mockExecutorService.isTerminated()).thenReturn(false);
Duration terminationDuration = Duration.ofSeconds(1);
DiagnosticsManager diagnosticsManager =
- new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
- maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
- hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
- autosizingEnabled, config);
+ new DiagnosticsManager(JOB_NAME, JOB_ID, containerModels, CONTAINER_MB, CONTAINER_NUM_CORES,
+ NUM_PERSISTENT_STORES, MAX_HEAP_SIZE, CONTAINER_THREAD_POOL_SIZE, "0", EXECUTION_ENV_CONTAINER_ID,
+ SAMZA_EPOCH_ID, TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, diagnosticsSystemStream,
+ mockSystemProducer, terminationDuration, mockExecutorService, AUTOSIZING_ENABLED, config, this.clock);
diagnosticsManager.stop();
@@ -168,7 +181,7 @@ public class TestDiagnosticsManager {
Assert.assertEquals("One message should have been published", 1, mockSystemProducer.getEnvelopeList().size());
OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0);
- validateMetricsHeader(outgoingMessageEnvelope);
+ validateMetricsHeader(outgoingMessageEnvelope, FIRST_SEND_TIME);
validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
}
@@ -176,19 +189,19 @@ public class TestDiagnosticsManager {
public void testSecondPublishWithProcessorStopInSecondMessage() {
// Across two successive run() invocations two messages should be published if stop events are added
this.diagnosticsManager.start();
- this.diagnosticsManager.addProcessorStopEvent("0", executionEnvContainerId, hostname, 102);
+ this.diagnosticsManager.addProcessorStopEvent("0", EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 102);
this.diagnosticsManager.start();
Assert.assertEquals("Two messages should have been published", 2, mockSystemProducer.getEnvelopeList().size());
// Validate the first message
OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0);
- validateMetricsHeader(outgoingMessageEnvelope);
+ validateMetricsHeader(outgoingMessageEnvelope, FIRST_SEND_TIME);
validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
// Validate the second message's header
outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1);
- validateMetricsHeader(outgoingMessageEnvelope);
+ validateMetricsHeader(outgoingMessageEnvelope, SECOND_SEND_TIME);
// Validate the second message's body (should be all empty except for the processor-stop-event)
MetricsSnapshot metricsSnapshot =
@@ -199,7 +212,7 @@ public class TestDiagnosticsManager {
Assert.assertNull(diagnosticsStreamMessage.getContainerMb());
Assert.assertNull(diagnosticsStreamMessage.getExceptionEvents());
Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(),
- Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 102)));
+ Arrays.asList(new ProcessorStopEvent("0", EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 102)));
Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores());
@@ -217,12 +230,12 @@ public class TestDiagnosticsManager {
// Validate the first message
OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0);
- validateMetricsHeader(outgoingMessageEnvelope);
+ validateMetricsHeader(outgoingMessageEnvelope, FIRST_SEND_TIME);
validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
// Validate the second message's header
outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1);
- validateMetricsHeader(outgoingMessageEnvelope);
+ validateMetricsHeader(outgoingMessageEnvelope, SECOND_SEND_TIME);
// Validate the second message's body (should be all empty except for the processor-stop-event)
MetricsSnapshot metricsSnapshot =
@@ -243,22 +256,17 @@ public class TestDiagnosticsManager {
this.diagnosticsManager.stop();
}
- private void validateMetricsHeader(OutgoingMessageEnvelope outgoingMessageEnvelope) {
+ private void validateMetricsHeader(OutgoingMessageEnvelope outgoingMessageEnvelope, long sendTime) {
// Validate the outgoing message
Assert.assertTrue(outgoingMessageEnvelope.getSystemStream().equals(diagnosticsSystemStream));
MetricsSnapshot metricsSnapshot =
new MetricsSnapshotSerdeV2().fromBytes((byte[]) outgoingMessageEnvelope.getMessage());
- // Validate all header fields
- Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName);
- Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId);
- Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(), executionEnvContainerId);
- Assert.assertEquals(metricsSnapshot.getHeader().getVersion(), taskClassVersion);
- Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion);
- Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname);
- Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName());
-
+ MetricsHeader expectedHeader = new MetricsHeader(JOB_NAME, JOB_ID, "samza-container-0", EXECUTION_ENV_CONTAINER_ID,
+ Optional.of(SAMZA_EPOCH_ID), DiagnosticsManager.class.getName(), TASK_CLASS_VERSION, SAMZA_VERSION,
+ HOSTNAME, sendTime, RESET_TIME);
+ Assert.assertEquals(expectedHeader, metricsSnapshot.getHeader());
}
private void validateOutgoingMessageEnvelope(OutgoingMessageEnvelope outgoingMessageEnvelope) {
@@ -269,15 +277,16 @@ public class TestDiagnosticsManager {
DiagnosticsStreamMessage diagnosticsStreamMessage =
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
- Assert.assertEquals(containerMb, diagnosticsStreamMessage.getContainerMb().intValue());
- Assert.assertEquals(maxHeapSize, diagnosticsStreamMessage.getMaxHeapSize().longValue());
- Assert.assertEquals(containerThreadPoolSize, diagnosticsStreamMessage.getContainerThreadPoolSize().intValue());
+ Assert.assertEquals(CONTAINER_MB, diagnosticsStreamMessage.getContainerMb().intValue());
+ Assert.assertEquals(MAX_HEAP_SIZE, diagnosticsStreamMessage.getMaxHeapSize().longValue());
+ Assert.assertEquals(CONTAINER_THREAD_POOL_SIZE, diagnosticsStreamMessage.getContainerThreadPoolSize().intValue());
Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents());
- Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 101)));
+ Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), Arrays.asList(new ProcessorStopEvent("0",
+ EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 101)));
Assert.assertEquals(containerModels, diagnosticsStreamMessage.getContainerModels());
- Assert.assertEquals(containerNumCores, diagnosticsStreamMessage.getContainerNumCores().intValue());
- Assert.assertEquals(numPersistentStores, diagnosticsStreamMessage.getNumPersistentStores().intValue());
- Assert.assertEquals(autosizingEnabled, diagnosticsStreamMessage.getAutosizingEnabled());
+ Assert.assertEquals(CONTAINER_NUM_CORES, diagnosticsStreamMessage.getContainerNumCores().intValue());
+ Assert.assertEquals(NUM_PERSISTENT_STORES, diagnosticsStreamMessage.getNumPersistentStores().intValue());
+ Assert.assertEquals(AUTOSIZING_ENABLED, diagnosticsStreamMessage.getAutosizingEnabled());
Assert.assertEquals(config, diagnosticsStreamMessage.getConfig());
}
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
index 72b1b5f..459fd14 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
@@ -32,6 +33,7 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.reporter.MetricsHeader;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Assert;
@@ -39,22 +41,22 @@ import org.junit.Test;
public class TestDiagnosticsStreamMessage {
-
- private final String jobName = "Testjob";
- private final String jobId = "test job id";
- private final String containerName = "sample container name";
- private final String executionEnvContainerId = "exec container id";
- private final String taskClassVersion = "0.0.1";
- private final String samzaVersion = "1.3.0";
- private final String hostname = "sample host name";
+ private static final String JOB_NAME = "Testjob";
+ private static final String JOB_ID = "test job id";
+ private static final String CONTAINER_NAME = "sample container name";
+ private static final String EXECUTION_ENV_CONTAINER_ID = "exec container id";
+ private static final String SAMZA_EPOCH_ID = "epoch-123";
+ private static final String TASK_CLASS_VERSION = "0.0.1";
+ private static final String SAMZA_VERSION = "1.3.0";
+ private static final String HOSTNAME = "sample host name";
private final long timestamp = System.currentTimeMillis();
private final long resetTimestamp = System.currentTimeMillis();
- private final Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId));
+ private final Config config = new MapConfig(ImmutableMap.of("job.name", JOB_NAME, "job.id", JOB_ID));
- private DiagnosticsStreamMessage getDiagnosticsStreamMessage() {
+ private DiagnosticsStreamMessage getDiagnosticsStreamMessage(Optional<String> samzaEpochId) {
DiagnosticsStreamMessage diagnosticsStreamMessage =
- new DiagnosticsStreamMessage(jobName, jobId, containerName, executionEnvContainerId, taskClassVersion,
- samzaVersion, hostname, timestamp, resetTimestamp);
+ new DiagnosticsStreamMessage(JOB_NAME, JOB_ID, CONTAINER_NAME, EXECUTION_ENV_CONTAINER_ID, samzaEpochId,
+ TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, timestamp, resetTimestamp);
diagnosticsStreamMessage.addContainerMb(1024);
diagnosticsStreamMessage.addContainerNumCores(2);
@@ -66,10 +68,10 @@ public class TestDiagnosticsStreamMessage {
}
public static Collection<DiagnosticsExceptionEvent> getExceptionList() {
- BoundedList boundedList = new BoundedList<DiagnosticsExceptionEvent>("exceptions");
+ BoundedList<DiagnosticsExceptionEvent> boundedList = new BoundedList<>("exceptions");
DiagnosticsExceptionEvent diagnosticsExceptionEvent =
new DiagnosticsExceptionEvent(1, new Exception("this is a samza exception", new Exception("cause")),
- new HashMap());
+ new HashMap<>());
boundedList.add(diagnosticsExceptionEvent);
return boundedList.getValues();
@@ -77,7 +79,7 @@ public class TestDiagnosticsStreamMessage {
public List<ProcessorStopEvent> getProcessorStopEventList() {
List<ProcessorStopEvent> stopEventList = new ArrayList<>();
- stopEventList.add(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 101));
+ stopEventList.add(new ProcessorStopEvent("0", EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 101));
return stopEventList;
}
@@ -102,8 +104,8 @@ public class TestDiagnosticsStreamMessage {
*/
@Test
public void basicTest() {
-
- DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage();
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
+ getDiagnosticsStreamMessage(Optional.of(SAMZA_EPOCH_ID));
Collection<DiagnosticsExceptionEvent> exceptionEventList = getExceptionList();
diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
@@ -123,20 +125,18 @@ public class TestDiagnosticsStreamMessage {
*/
@Test
public void serdeTest() {
- DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage();
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
+ getDiagnosticsStreamMessage(Optional.of(SAMZA_EPOCH_ID));
Collection<DiagnosticsExceptionEvent> exceptionEventList = getExceptionList();
diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
diagnosticsStreamMessage.addContainerModels(getSampleContainerModels());
MetricsSnapshot metricsSnapshot = diagnosticsStreamMessage.convertToMetricsSnapshot();
- Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName);
- Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId);
- Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(), executionEnvContainerId);
- Assert.assertEquals(metricsSnapshot.getHeader().getVersion(), taskClassVersion);
- Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion);
- Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname);
- Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName());
+ MetricsHeader expectedHeader = new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXECUTION_ENV_CONTAINER_ID,
+ Optional.of(SAMZA_EPOCH_ID), DiagnosticsManager.class.getName(), TASK_CLASS_VERSION, SAMZA_VERSION,
+ HOSTNAME, timestamp, resetTimestamp);
+ Assert.assertEquals(metricsSnapshot.getHeader(), expectedHeader);
Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap();
Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions"));
@@ -149,7 +149,20 @@ public class TestDiagnosticsStreamMessage {
DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+ Assert.assertEquals(convertedDiagnosticsStreamMessage, diagnosticsStreamMessage);
+ }
- Assert.assertTrue(convertedDiagnosticsStreamMessage.equals(diagnosticsStreamMessage));
+ @Test
+ public void testSerdeEmptySamzaEpochIdInHeader() {
+ DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage(Optional.empty());
+ MetricsSnapshot metricsSnapshot = diagnosticsStreamMessage.convertToMetricsSnapshot();
+ MetricsHeader expectedHeader =
+ new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXECUTION_ENV_CONTAINER_ID, Optional.empty(),
+ DiagnosticsManager.class.getName(), TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, timestamp, resetTimestamp);
+ Assert.assertEquals(metricsSnapshot.getHeader(), expectedHeader);
+
+ DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
+ DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+ Assert.assertEquals(convertedDiagnosticsStreamMessage, diagnosticsStreamMessage);
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java
index bbd6b30..23ee2a1 100644
--- a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java
@@ -20,6 +20,7 @@ package org.apache.samza.metrics.reporter;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -30,6 +31,7 @@ public class TestMetricsHeader {
private static final String JOB_ID = "id-a";
private static final String CONTAINER_NAME = "samza-container-0";
private static final String EXEC_ENV_CONTAINER_ID = "container-12345";
+ private static final String SAMZA_EPOCH_ID = "epoch-12345";
private static final String SOURCE = "metrics-source";
private static final String VERSION = "1.2.3";
private static final String SAMZA_VERSION = "4.5.6";
@@ -40,13 +42,14 @@ public class TestMetricsHeader {
@Test
public void testGetAsMap() {
MetricsHeader metricsHeader =
- new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, SOURCE, VERSION, SAMZA_VERSION, HOST,
- TIME, RESET_TIME);
+ new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.of(SAMZA_EPOCH_ID),
+ SOURCE, VERSION, SAMZA_VERSION, HOST, TIME, RESET_TIME);
Map<String, Object> expected = new HashMap<>();
expected.put("job-name", JOB_NAME);
expected.put("job-id", JOB_ID);
expected.put("container-name", CONTAINER_NAME);
expected.put("exec-env-container-id", EXEC_ENV_CONTAINER_ID);
+ expected.put("samza-epoch-id", SAMZA_EPOCH_ID);
expected.put("source", SOURCE);
expected.put("version", VERSION);
expected.put("samza-version", SAMZA_VERSION);
@@ -54,6 +57,13 @@ public class TestMetricsHeader {
expected.put("time", TIME);
expected.put("reset-time", RESET_TIME);
assertEquals(expected, metricsHeader.getAsMap());
+
+ // test with empty samza epoch id
+ metricsHeader =
+ new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.empty(), SOURCE, VERSION,
+ SAMZA_VERSION, HOST, TIME, RESET_TIME);
+ expected.remove("samza-epoch-id");
+ assertEquals(expected, metricsHeader.getAsMap());
}
@Test
@@ -63,6 +73,7 @@ public class TestMetricsHeader {
map.put("job-id", JOB_ID);
map.put("container-name", CONTAINER_NAME);
map.put("exec-env-container-id", EXEC_ENV_CONTAINER_ID);
+ map.put("samza-epoch-id", SAMZA_EPOCH_ID);
map.put("source", SOURCE);
map.put("version", VERSION);
map.put("samza-version", SAMZA_VERSION);
@@ -70,8 +81,15 @@ public class TestMetricsHeader {
map.put("time", TIME);
map.put("reset-time", RESET_TIME);
MetricsHeader expected =
- new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, SOURCE, VERSION, SAMZA_VERSION, HOST,
- TIME, RESET_TIME);
+ new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.of(SAMZA_EPOCH_ID),
+ SOURCE, VERSION, SAMZA_VERSION, HOST, TIME, RESET_TIME);
+ assertEquals(expected, MetricsHeader.fromMap(map));
+
+ // test with missing samza epoch id
+ map.remove("samza-epoch-id");
+ expected =
+ new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.empty(), SOURCE, VERSION,
+ SAMZA_VERSION, HOST, TIME, RESET_TIME);
assertEquals(expected, MetricsHeader.fromMap(map));
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java
index 9b5c693..43e3cf8 100644
--- a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java
@@ -19,6 +19,7 @@
package org.apache.samza.metrics.reporter;
import java.util.Map;
+import java.util.Optional;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
@@ -27,7 +28,8 @@ import static org.junit.Assert.assertEquals;
public class TestMetricsSnapshot {
private static final MetricsHeader METRICS_HEADER =
- new MetricsHeader("job", "id", "container", "container-id", "source", "1.2", "3.4", "a.b.c", 100, 10);
+ new MetricsHeader("job", "id", "container", "container-id", Optional.of("epoch-123"), "source", "1.2", "3.4",
+ "a.b.c", 100, 10);
private static final Metrics METRICS =
new Metrics(ImmutableMap.of("group0", ImmutableMap.of("a", "b"), "group1", ImmutableMap.of("c", "d")));
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java
index c761486..09a72b6 100644
--- a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java
@@ -30,11 +30,12 @@ import org.apache.samza.serializers.Serializer;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Clock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -58,13 +59,18 @@ public class TestMetricsSnapshotReporter {
private static final String SAMZA_VERSION = "test samza version";
private static final String HOSTNAME = "test host";
private static final Duration REPORTING_INTERVAL = Duration.ofSeconds(60000);
+ private static final long RESET_TIME = 10;
+ private static final long SEND_TIME = 20;
private Serializer<MetricsSnapshot> serializer;
private SystemProducer producer;
+ private Clock clock;
@Before
public void setup() {
producer = mock(SystemProducer.class);
+ clock = mock(Clock.class);
+ Mockito.when(clock.currentTimeMillis()).thenReturn(RESET_TIME, SEND_TIME);
serializer = new MetricsSnapshotSerdeV2();
}
@@ -162,13 +168,10 @@ public class TestMetricsSnapshotReporter {
MetricsSnapshot metricsSnapshot = (MetricsSnapshot) envelopes.get(0).getMessage();
- Assert.assertEquals(JOB_NAME, metricsSnapshot.getHeader().getJobName());
- Assert.assertEquals(JOB_ID, metricsSnapshot.getHeader().getJobId());
- Assert.assertEquals(CONTAINER_NAME, metricsSnapshot.getHeader().getContainerName());
- Assert.assertEquals(source, metricsSnapshot.getHeader().getSource());
- Assert.assertEquals(SAMZA_VERSION, metricsSnapshot.getHeader().getSamzaVersion());
- Assert.assertEquals(TASK_VERSION, metricsSnapshot.getHeader().getVersion());
- Assert.assertEquals(HOSTNAME, metricsSnapshot.getHeader().getHost());
+ MetricsHeader expectedHeader =
+ new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, "", Optional.of(""), source, TASK_VERSION, SAMZA_VERSION,
+ HOSTNAME, SEND_TIME, RESET_TIME);
+ Assert.assertEquals(expectedHeader, metricsSnapshot.getHeader());
Map<String, Map<String, Object>> metricMap = metricsSnapshot.getMetrics().getAsMap();
Assert.assertEquals(1, metricMap.size());
@@ -179,6 +182,6 @@ public class TestMetricsSnapshotReporter {
private MetricsSnapshotReporter getMetricsSnapshotReporter(Optional<String> blacklist) {
return new MetricsSnapshotReporter(producer, SYSTEM_STREAM, REPORTING_INTERVAL, JOB_NAME, JOB_ID, CONTAINER_NAME,
- TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer, blacklist.map(Pattern::compile), SystemClock.instance());
+ TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer, blacklist.map(Pattern::compile), this.clock);
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java b/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java
index b75cbde..35556e9 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java
@@ -21,61 +21,88 @@ package org.apache.samza.serializers;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.apache.samza.metrics.reporter.Metrics;
import org.apache.samza.metrics.reporter.MetricsHeader;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class TestMetricsSnapshotSerde {
- private static final String SERIALIZED =
- "{\"header\":{\"job-id\":\"testjobid\",\"exec-env-container-id\":\"test exec env container id\",\"samza-version\":\"samzaversion\",\"job-name\":\"test-jobName\",\"host\":\"host\",\"reset-time\":2,\"container-name\":\"samza-container-0\",\"source\":\"test source\",\"time\":1,\"version\":\"version\"},\"metrics\":{\"test\":{\"test2\":\"foo\"}}}";
+ @Test
+ public void testSerializeThenDeserialize() {
+ MetricsSnapshot snapshot = metricsSnapshot(true);
+ MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
+ byte[] bytes = serde.toBytes(snapshot);
+ assertEquals(snapshot, serde.fromBytes(bytes));
+ }
@Test
- public void testMetricsSerdeSerializeAndDeserialize() {
- Map<String, Object> metricsMap = new HashMap<>();
- metricsMap.put("test2", "foo");
- Map<String, Map<String, Object>> metricsGroupMap = new HashMap<>();
- metricsGroupMap.put("test", metricsMap);
- MetricsSnapshot snapshot = new MetricsSnapshot(metricsHeader(), Metrics.fromMap(metricsGroupMap));
+ public void testSerializeThenDeserializeEmptySamzaEpochIdInHeader() {
+ MetricsSnapshot snapshot = metricsSnapshot(true);
MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
byte[] bytes = serde.toBytes(snapshot);
assertEquals(snapshot, serde.fromBytes(bytes));
}
/**
- * Helps for testing compatibility against older versions of code.
+ * Helps for verifying compatibility when schemas evolve.
+ *
+ * Maps have non-deterministic ordering when serialized, so it is difficult to check exact serialized results. It
+ * isn't really necessary to check the serialized results anyways. We just need to make sure serialized data can be
+ * read by old and new systems.
*/
@Test
- public void testMetricsSerdeSerialize() {
- Map<String, Object> metricsMap = new HashMap<>();
- metricsMap.put("test2", "foo");
- Map<String, Map<String, Object>> metricsGroupMap = new HashMap<>();
- metricsGroupMap.put("test", metricsMap);
- MetricsSnapshot snapshot = new MetricsSnapshot(metricsHeader(), Metrics.fromMap(metricsGroupMap));
+ public void testDeserializeRaw() {
+ MetricsSnapshot snapshot = metricsSnapshot(true);
MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
- assertArrayEquals(SERIALIZED.getBytes(StandardCharsets.UTF_8), serde.toBytes(snapshot));
+ assertEquals(snapshot, serde.fromBytes(expectedSeralizedSnapshot(true, false).getBytes(StandardCharsets.UTF_8)));
+ assertEquals(snapshot, serde.fromBytes(expectedSeralizedSnapshot(true, true).getBytes(StandardCharsets.UTF_8)));
}
/**
- * Helps for testing compatibility against older versions of code.
+ * Helps for verifying compatibility when schemas evolve.
*/
@Test
- public void testMetricsSerdeDeserialize() {
+ public void testDeserializeRawEmptySamzaEpochIdInHeader() {
+ MetricsSnapshot snapshot = metricsSnapshot(false);
+ MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
+ assertEquals(snapshot, serde.fromBytes(expectedSeralizedSnapshot(false, false).getBytes(StandardCharsets.UTF_8)));
+ assertEquals(snapshot, serde.fromBytes(expectedSeralizedSnapshot(false, true).getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static String expectedSeralizedSnapshot(boolean includeSamzaEpochId,
+ boolean includeExtraHeaderField) {
+ String serializedSnapshot =
+ "{\"header\":{\"job-id\":\"testjobid\",\"exec-env-container-id\":\"test exec env container id\",";
+ if (includeSamzaEpochId) {
+ serializedSnapshot += "\"samza-epoch-id\":\"epoch-123\",";
+ }
+ if (includeExtraHeaderField) {
+ serializedSnapshot += "\"extra-header-field\":\"extra header value\",";
+ }
+ serializedSnapshot +=
+ "\"samza-version\":\"samzaversion\",\"job-name\":\"test-jobName\",\"host\":\"host\",\"reset-time\":2,"
+ + "\"container-name\":\"samza-container-0\",\"source\":\"test source\",\"time\":1,\"version\":\"version\"},"
+ + "\"metrics\":{\"test\":{\"test2\":\"foo\"}}}";
+ return serializedSnapshot;
+ }
+
+ private static MetricsSnapshot metricsSnapshot(boolean includeSamzaEpochId) {
+ MetricsHeader metricsHeader;
+ if (includeSamzaEpochId) {
+ metricsHeader = new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id",
+ Optional.of("epoch-123"), "test source", "version", "samzaversion", "host", 1L, 2L);
+ } else {
+ metricsHeader = new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id",
+ "test source", "version", "samzaversion", "host", 1L, 2L);
+ }
Map<String, Object> metricsMap = new HashMap<>();
metricsMap.put("test2", "foo");
Map<String, Map<String, Object>> metricsGroupMap = new HashMap<>();
metricsGroupMap.put("test", metricsMap);
- MetricsSnapshot snapshot = new MetricsSnapshot(metricsHeader(), Metrics.fromMap(metricsGroupMap));
- MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
- assertEquals(snapshot, serde.fromBytes(SERIALIZED.getBytes(StandardCharsets.UTF_8)));
- }
-
- private static MetricsHeader metricsHeader() {
- return new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id",
- "test source", "version", "samzaversion", "host", 1L, 2L);
+ return new MetricsSnapshot(metricsHeader, Metrics.fromMap(metricsGroupMap));
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
index 6a3dc21..f0b8ab5 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
@@ -22,6 +22,7 @@ package org.apache.samza.serializers.model.serializers;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.diagnostics.BoundedList;
@@ -32,15 +33,14 @@ import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class TestMetricsSnapshotSerdeV2 {
@Test
- public void testSerializeAndDeserialize() {
+ public void testSerializeThenDeserialize() {
SamzaException samzaException = new SamzaException("this is a samza exception", new RuntimeException("cause"));
- MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException);
+ MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException, true);
MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot);
MetricsSnapshot deserializedMetricsSnapshot = metricsSnapshotSerde.fromBytes(serializedBytes);
@@ -48,34 +48,64 @@ public class TestMetricsSnapshotSerdeV2 {
}
@Test
- public void testSerialize() {
+ public void testSerializeThenDeserializeEmptySamzaEpochIdInHeader() {
SamzaException samzaException = new SamzaException("this is a samza exception", new RuntimeException("cause"));
- MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException);
+ MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException, false);
MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot);
- assertArrayEquals(expectedSeralizedSnapshot(samzaException).getBytes(StandardCharsets.UTF_8), serializedBytes);
+ MetricsSnapshot deserializedMetricsSnapshot = metricsSnapshotSerde.fromBytes(serializedBytes);
+ assertEquals(metricsSnapshot, deserializedMetricsSnapshot);
+ }
+
+ /**
+ * Helps for verifying compatibility when schemas evolve.
+ *
+ * Maps have non-deterministic ordering when serialized, so it is difficult to check exact serialized results. It
+ * isn't really necessary to check the serialized results anyways. We just need to make sure serialized data can be
+ * read by old and new systems.
+ */
+ @Test
+ public void testDeserializeRaw() {
+ SamzaException samzaException = new SamzaException("this is a samza exception", new RuntimeException("cause"));
+ MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException, true);
+ MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
+ assertEquals(metricsSnapshot, metricsSnapshotSerde.fromBytes(
+ expectedSeralizedSnapshot(samzaException, true, false).getBytes(StandardCharsets.UTF_8)));
+ assertEquals(metricsSnapshot, metricsSnapshotSerde.fromBytes(
+ expectedSeralizedSnapshot(samzaException, true, true).getBytes(StandardCharsets.UTF_8)));
}
+ /**
+ * Helps for verifying compatibility when schemas evolve.
+ */
@Test
- public void testDeserialize() {
+ public void testDeserializeRawEmptySamzaEpochIdInHeader() {
SamzaException samzaException = new SamzaException("this is a samza exception", new RuntimeException("cause"));
- MetricsSnapshot expectedSnapshot = metricsSnapshot(samzaException);
+ MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException, false);
MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
- MetricsSnapshot deserializedSnapshot =
- metricsSnapshotSerde.fromBytes(expectedSeralizedSnapshot(samzaException).getBytes(StandardCharsets.UTF_8));
- assertEquals(expectedSnapshot, deserializedSnapshot);
+ assertEquals(metricsSnapshot, metricsSnapshotSerde.fromBytes(
+ expectedSeralizedSnapshot(samzaException, false, false).getBytes(StandardCharsets.UTF_8)));
+ assertEquals(metricsSnapshot, metricsSnapshotSerde.fromBytes(
+ expectedSeralizedSnapshot(samzaException, false, true).getBytes(StandardCharsets.UTF_8)));
}
- private static MetricsSnapshot metricsSnapshot(Exception exception) {
- MetricsHeader metricsHeader =
- new MetricsHeader("jobName", "i001", "container 0", "test container ID", "source", "300.14.25.1", "1", "1", 1,
- 1);
+ private static MetricsSnapshot metricsSnapshot(Exception exception, boolean includeSamzaEpochId) {
+ MetricsHeader metricsHeader;
+ if (includeSamzaEpochId) {
+ metricsHeader =
+ new MetricsHeader("jobName", "i001", "container 0", "test container ID", Optional.of("epoch-123"),
+ "source", "300.14.25.1", "1", "1", 1, 1);
+ } else {
+ metricsHeader =
+ new MetricsHeader("jobName", "i001", "container 0", "test container ID", "source", "300.14.25.1", "1", "1", 1,
+ 1);
+ }
BoundedList<DiagnosticsExceptionEvent> boundedList = new BoundedList<>("exceptions");
DiagnosticsExceptionEvent diagnosticsExceptionEvent = new DiagnosticsExceptionEvent(1, exception, new HashMap<>());
boundedList.add(diagnosticsExceptionEvent);
Map<String, Map<String, Object>> metricMessage = new HashMap<>();
Map<String, Object> samzaContainerMetrics = new HashMap<>();
- samzaContainerMetrics.put("commit-calls", 0);
+ samzaContainerMetrics.put("commit-calls", 1);
metricMessage.put("org.apache.samza.container.SamzaContainerMetrics", samzaContainerMetrics);
Map<String, Object> exceptions = new HashMap<>();
exceptions.put("exceptions", boundedList.getValues());
@@ -83,13 +113,34 @@ public class TestMetricsSnapshotSerdeV2 {
return new MetricsSnapshot(metricsHeader, new Metrics(metricMessage));
}
- private static String expectedSeralizedSnapshot(Exception exception) {
+ /**
+ * @param includeSamzaEpochId include the new samza-epoch-id field (for testing that new code can read old data
+ * without this field)
+ * @param includeExtraHeaderField include an extra new field (for testing that old code can read new data with extra
+ * fields)
+ */
+ private static String expectedSeralizedSnapshot(Exception exception, boolean includeSamzaEpochId,
+ boolean includeExtraHeaderField) {
String stackTrace = ExceptionUtils.getStackTrace(exception);
+ String serializedSnapshot =
+ "{\"header\":[\"java.util.HashMap\",{\"job-id\":\"i001\",\"exec-env-container-id\":\"test container ID\",";
+ if (includeSamzaEpochId) {
+ serializedSnapshot += "\"samza-epoch-id\":\"epoch-123\",";
+ }
+ if (includeExtraHeaderField) {
+ serializedSnapshot += "\"extra-header-field\":\"extra header value\",";
+ }
// in serialized string, backslash in whitespace characters (e.g. \n, \t) are escaped
String escapedStackTrace = stackTrace.replace("\n", "\\n").replace("\t", "\\t");
- return
- "{\"header\":[\"java.util.HashMap\",{\"job-id\":\"i001\",\"exec-env-container-id\":\"test container ID\",\"samza-version\":\"1\",\"job-name\":\"jobName\",\"host\":\"1\",\"reset-time\":[\"java.lang.Long\",1],\"container-name\":\"container 0\",\"source\":\"source\",\"time\":[\"java.lang.Long\",1],\"version\":\"300.14.25.1\"}],\"metrics\":[\"java.util.HashMap\",{\"org.apache.samza.exceptions\":[\"java.util.HashMap\",{\"exceptions\":[\"java.util.Collections$UnmodifiableRandomAccessLi [...]
- + escapedStackTrace
- + "\",\"mdcMap\":[\"java.util.HashMap\",{}]}]]]}],\"org.apache.samza.container.SamzaContainerMetrics\":[\"java.util.HashMap\",{\"commit-calls\":0}]}]}";
+ serializedSnapshot +=
+ "\"samza-version\":\"1\",\"job-name\":\"jobName\",\"host\":\"1\",\"reset-time\":[\"java.lang.Long\",1],"
+ + "\"container-name\":\"container 0\",\"source\":\"source\",\"time\":[\"java.lang.Long\",1],\"version\":\"300.14.25.1\"}],"
+ + "\"metrics\":[\"java.util.HashMap\",{\"org.apache.samza.exceptions\":"
+ + "[\"java.util.HashMap\",{\"exceptions\":[\"java.util.Collections$UnmodifiableRandomAccessList\","
+ + "[[\"org.apache.samza.diagnostics.DiagnosticsExceptionEvent\",{\"timestamp\":1,\"exceptionType\":\"org.apache.samza.SamzaException\","
+ + "\"exceptionMessage\":\"this is a samza exception\",\"compactExceptionStackTrace\":\"" + escapedStackTrace
+ + "\",\"mdcMap\":[\"java.util.HashMap\",{}]}]]]}],\"org.apache.samza.container.SamzaContainerMetrics\":"
+ + "[\"java.util.HashMap\",{\"commit-calls\":1}]}]}";
+ return serializedSnapshot;
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
index e10c551..09bca34 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
@@ -50,6 +50,7 @@ public class TestDiagnosticsUtil {
private static final String JOB_ID = "someId";
private static final String CONTAINER_ID = "someContainerId";
private static final String ENV_ID = "someEnvID";
+ private static final String SAMZA_EPOCH_ID = "someEpochID";
public static final String REPORTER_FACTORY = "org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory";
public static final String SYSTEM_FACTORY = "com.foo.system.SomeSystemFactory";
@@ -65,7 +66,7 @@ public class TestDiagnosticsUtil {
Optional<DiagnosticsManager> diagnosticsManager =
DiagnosticsUtil.buildDiagnosticsManager(JOB_NAME, JOB_ID, mockJobModel, CONTAINER_ID, Optional.of(ENV_ID),
- config);
+ Optional.of(SAMZA_EPOCH_ID), config);
Assert.assertTrue(diagnosticsManager.isPresent());
}