You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/12/07 00:30:12 UTC

[samza] branch master updated: SAMZA-2075: Add execEnvContainerId and execEnvAttemptId for diagnostics for Kubernetes job coordinator (#1566)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 1daca89  SAMZA-2075: Add execEnvContainerId and execEnvAttemptId for diagnostics for Kubernetes job coordinator (#1566)
1daca89 is described below

commit 1daca8923396492496732175d87c0367b21355df
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Mon Dec 6 16:30:05 2021 -0800

    SAMZA-2075: Add execEnvContainerId and execEnvAttemptId for diagnostics for Kubernetes job coordinator (#1566)
    
    API changes (backwards compatible):
    JSON representation of MetricsHeader (used in MetricsSnapshot) has an additional optional field called samza-epoch-id, and this field is intended to contain the deployment attempt id which is shared across all containers (job coordinator and wokrers) of a Samza job. This field is filled in by reading the SAMZA_EPOCH_ID environment variable.
---
 .../clustermanager/ContainerProcessManager.java    |   2 +-
 .../StaticResourceJobCoordinator.java              |  23 ++--
 .../StaticResourceJobCoordinatorFactory.java       |  12 +-
 .../org/apache/samza/metrics/reporter/Metrics.java |   2 +-
 .../samza/metrics/reporter/MetricsHeader.java      |  42 +++++--
 .../samza/metrics/reporter/MetricsSnapshot.java    |   2 +-
 .../metrics/reporter/MetricsSnapshotReporter.java  |  12 +-
 .../apache/samza/processor/StreamProcessor.java    |   3 +-
 .../apache/samza/runtime/ContainerLaunchUtil.java  |  28 +++--
 .../apache/samza/runtime/LocalContainerRunner.java |  11 +-
 .../org/apache/samza/util/DiagnosticsUtil.java     |  15 +--
 .../samza/diagnostics/DiagnosticsManager.java      |  28 +++--
 .../diagnostics/DiagnosticsStreamMessage.java      |  18 ++-
 .../TestStaticResourceJobCoordinator.java          |  27 +++--
 .../samza/diagnostics/TestDiagnosticsManager.java  | 125 +++++++++++----------
 .../diagnostics/TestDiagnosticsStreamMessage.java  |  65 ++++++-----
 .../samza/metrics/reporter/TestMetricsHeader.java  |  26 ++++-
 .../metrics/reporter/TestMetricsSnapshot.java      |   4 +-
 .../reporter/TestMetricsSnapshotReporter.java      |  21 ++--
 .../serializers/TestMetricsSnapshotSerde.java      |  81 ++++++++-----
 .../serializers/TestMetricsSnapshotSerdeV2.java    |  93 +++++++++++----
 .../org/apache/samza/util/TestDiagnosticsUtil.java |   3 +-
 22 files changed, 420 insertions(+), 223 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 4ef9c68..254e16e 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -161,7 +161,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
     this.diagnosticsManager =
         DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, state.jobModelManager.jobModel(), METRICS_SOURCE_NAME,
-            execEnvContainerId, config);
+            execEnvContainerId, Optional.empty(), config);
 
     this.localityManager = localityManager;
     // Wire all metrics to all reporters
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
index 25038e9..a14fa89 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
@@ -78,6 +78,8 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
   private final MetricsRegistry metrics;
   private final SystemAdmins systemAdmins;
   private final String processorId;
+  private final Optional<String> executionEnvContainerId;
+  private final Optional<String> samzaEpochId;
   private final Config config;
 
   private volatile Optional<JobCoordinatorListener> jobCoordinatorListener = Optional.empty();
@@ -106,21 +108,24 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
       JobInfoServingContext jobModelServingContext, CoordinatorCommunication coordinatorCommunication,
       JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
       StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory,
-      StreamRegexMonitorFactory streamRegexMonitorFactory, StartpointManager startpointManager,
+      StreamRegexMonitorFactory streamRegexMonitorFactory, Optional<StartpointManager> startpointManager,
       ChangelogStreamManager changelogStreamManager, JobRestartSignal jobRestartSignal, MetricsRegistry metrics,
-      SystemAdmins systemAdmins, Config config) {
+      SystemAdmins systemAdmins, Optional<String> executionEnvContainerId, Optional<String> samzaEpochId,
+      Config config) {
     this.jobModelHelper = jobModelHelper;
     this.jobModelServingContext = jobModelServingContext;
     this.coordinatorCommunication = coordinatorCommunication;
     this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
     this.streamPartitionCountMonitorFactory = streamPartitionCountMonitorFactory;
     this.streamRegexMonitorFactory = streamRegexMonitorFactory;
-    this.startpointManager = Optional.ofNullable(startpointManager);
+    this.startpointManager = startpointManager;
     this.changelogStreamManager = changelogStreamManager;
     this.jobRestartSignal = jobRestartSignal;
     this.metrics = metrics;
     this.systemAdmins = systemAdmins;
     this.processorId = processorId;
+    this.executionEnvContainerId = executionEnvContainerId;
+    this.samzaEpochId = samzaEpochId;
     this.config = config;
   }
 
@@ -232,9 +237,9 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
   private Optional<DiagnosticsManager> diagnosticsManager(JobModel jobModel) {
     JobConfig jobConfig = new JobConfig(this.config);
     String jobName = jobConfig.getName().orElseThrow(() -> new ConfigException("Missing job name"));
-    // TODO SAMZA-2705: construct execEnvContainerId for diagnostics
     return buildDiagnosticsManager(jobName, jobConfig.getJobId(), jobModel,
-        CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+        CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, this.executionEnvContainerId, this.samzaEpochId,
+        this.config);
   }
 
   /**
@@ -271,9 +276,11 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
    * Wrapper around {@link DiagnosticsUtil#buildDiagnosticsManager} so it can be stubbed during testing.
    */
   @VisibleForTesting
-  Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
-      String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) {
-    return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config);
+  Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName, String jobId, JobModel jobModel,
+      String containerId, Optional<String> executionEnvContainerId, Optional<String> samzaEpochId,
+      Config config) {
+    return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, executionEnvContainerId,
+        samzaEpochId, config);
   }
 
   private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
index 718778d..b8704bf 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
@@ -18,9 +18,11 @@
  */
 package org.apache.samza.coordinator.staticresource;
 
+import java.util.Optional;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.grouper.task.TaskAssignmentManager;
 import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
@@ -44,6 +46,7 @@ import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMes
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.environment.EnvironmentVariables;
 import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -73,8 +76,8 @@ public class StaticResourceJobCoordinatorFactory implements JobCoordinatorFactor
     JobRestartSignal jobRestartSignal =
         ReflectionUtil.getObj(new JobCoordinatorConfig(config).getJobRestartSignalFactory(),
             JobRestartSignalFactory.class).build(new JobRestartSignalFactoryContext(config));
-    StartpointManager startpointManager =
-        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    Optional<StartpointManager> startpointManager =
+        jobConfig.getStartpointEnabled() ? Optional.of(new StartpointManager(metadataStore)) : Optional.empty();
     SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
     StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
     JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
@@ -82,10 +85,13 @@ public class StaticResourceJobCoordinatorFactory implements JobCoordinatorFactor
         new StreamPartitionCountMonitorFactory(streamMetadataCache, metricsRegistry);
     StreamRegexMonitorFactory streamRegexMonitorFactory =
         new StreamRegexMonitorFactory(streamMetadataCache, metricsRegistry);
+    Optional<String> executionEnvContainerId =
+        Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+    Optional<String> samzaEpochId = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID));
     return new StaticResourceJobCoordinator(processorId, jobModelHelper, jobModelServingContext,
         coordinatorCommunication, jobCoordinatorMetadataManager, streamPartitionCountMonitorFactory,
         streamRegexMonitorFactory, startpointManager, changelogStreamManager, jobRestartSignal, metricsRegistry,
-        systemAdmins, config);
+        systemAdmins, executionEnvContainerId, samzaEpochId, config);
   }
 
   private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java
index 8c321c5..fa86565 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/Metrics.java
@@ -72,6 +72,6 @@ public class Metrics {
 
   @Override
   public String toString() {
-    return "MetricsJava{" + "immutableMetrics=" + immutableMetrics + '}';
+    return "Metrics{" + "immutableMetrics=" + immutableMetrics + '}';
   }
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java
index 5215038..29d7f10 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsHeader.java
@@ -21,6 +21,7 @@ package org.apache.samza.metrics.reporter;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 
 public class MetricsHeader {
@@ -28,6 +29,7 @@ public class MetricsHeader {
   private static final String JOB_ID = "job-id";
   private static final String CONTAINER_NAME = "container-name";
   private static final String EXEC_ENV_CONTAINER_ID = "exec-env-container-id";
+  private static final String SAMZA_EPOCH_ID = "samza-epoch-id";
   private static final String SOURCE = "source";
   private static final String VERSION = "version";
   private static final String SAMZA_VERSION = "samza-version";
@@ -39,6 +41,10 @@ public class MetricsHeader {
   private final String jobId;
   private final String containerName;
   private final String execEnvironmentContainerId;
+  /**
+   * This is optional for backwards compatibility. It was added added after the initial version of this class.
+   */
+  private final Optional<String> samzaEpochId;
   private final String source;
   private final String version;
   private final String samzaVersion;
@@ -48,10 +54,18 @@ public class MetricsHeader {
 
   public MetricsHeader(String jobName, String jobId, String containerName, String execEnvironmentContainerId,
       String source, String version, String samzaVersion, String host, long time, long resetTime) {
+    this(jobName, jobId, containerName, execEnvironmentContainerId, Optional.empty(), source, version, samzaVersion,
+        host, time, resetTime);
+  }
+
+  public MetricsHeader(String jobName, String jobId, String containerName, String execEnvironmentContainerId,
+      Optional<String> samzaEpochId, String source, String version, String samzaVersion, String host, long time,
+      long resetTime) {
     this.jobName = jobName;
     this.jobId = jobId;
     this.containerName = containerName;
     this.execEnvironmentContainerId = execEnvironmentContainerId;
+    this.samzaEpochId = samzaEpochId;
     this.source = source;
     this.version = version;
     this.samzaVersion = samzaVersion;
@@ -66,6 +80,7 @@ public class MetricsHeader {
     map.put(JOB_ID, jobId);
     map.put(CONTAINER_NAME, containerName);
     map.put(EXEC_ENV_CONTAINER_ID, execEnvironmentContainerId);
+    this.samzaEpochId.ifPresent(epochId -> map.put(SAMZA_EPOCH_ID, epochId));
     map.put(SOURCE, source);
     map.put(VERSION, version);
     map.put(SAMZA_VERSION, samzaVersion);
@@ -91,6 +106,14 @@ public class MetricsHeader {
     return execEnvironmentContainerId;
   }
 
+  /**
+   * Epoch id for the application, which is consistent across all components of a single deployment attempt.
+   * This may be empty, since this field was added to this class after the initial version.
+   */
+  public Optional<String> getSamzaEpochId() {
+    return samzaEpochId;
+  }
+
   public String getSource() {
     return source;
   }
@@ -120,6 +143,8 @@ public class MetricsHeader {
         map.get(JOB_ID).toString(),
         map.get(CONTAINER_NAME).toString(),
         map.get(EXEC_ENV_CONTAINER_ID).toString(),
+        // need to check existence for backwards compatibility with initial version of this class
+        Optional.ofNullable(map.get(SAMZA_EPOCH_ID)).map(Object::toString),
         map.get(SOURCE).toString(),
         map.get(VERSION).toString(),
         map.get(SAMZA_VERSION).toString(),
@@ -139,22 +164,23 @@ public class MetricsHeader {
     MetricsHeader that = (MetricsHeader) o;
     return time == that.time && resetTime == that.resetTime && Objects.equals(jobName, that.jobName) && Objects.equals(
         jobId, that.jobId) && Objects.equals(containerName, that.containerName) && Objects.equals(
-        execEnvironmentContainerId, that.execEnvironmentContainerId) && Objects.equals(source, that.source)
-        && Objects.equals(version, that.version) && Objects.equals(samzaVersion, that.samzaVersion) && Objects.equals(
-        host, that.host);
+        execEnvironmentContainerId, that.execEnvironmentContainerId) && Objects.equals(samzaEpochId,
+        that.samzaEpochId) && Objects.equals(source, that.source) && Objects.equals(version, that.version)
+        && Objects.equals(samzaVersion, that.samzaVersion) && Objects.equals(host, that.host);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host,
-        time, resetTime);
+    return Objects.hash(jobName, jobId, containerName, execEnvironmentContainerId, samzaEpochId, source,
+        version, samzaVersion, host, time, resetTime);
   }
 
   @Override
   public String toString() {
     return "MetricsHeader{" + "jobName='" + jobName + '\'' + ", jobId='" + jobId + '\'' + ", containerName='"
-        + containerName + '\'' + ", execEnvironmentContainerId='" + execEnvironmentContainerId + '\'' + ", source='"
-        + source + '\'' + ", version='" + version + '\'' + ", samzaVersion='" + samzaVersion + '\'' + ", host='" + host
-        + '\'' + ", time=" + time + ", resetTime=" + resetTime + '}';
+        + containerName + '\'' + ", execEnvironmentContainerId='" + execEnvironmentContainerId + '\''
+        + ", samzaEpochId=" + samzaEpochId + ", source='" + source + '\'' + ", version='" + version + '\''
+        + ", samzaVersion='" + samzaVersion + '\'' + ", host='" + host + '\'' + ", time=" + time + ", resetTime="
+        + resetTime + '}';
   }
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java
index 00b9deb..0454f79 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshot.java
@@ -75,6 +75,6 @@ public class MetricsSnapshot {
 
   @Override
   public String toString() {
-    return "MetricsSnapshotJava{" + "metricsHeader=" + metricsHeader + ", metrics=" + metrics + '}';
+    return "MetricsSnapshot{" + "metricsHeader=" + metricsHeader + ", metrics=" + metrics + '}';
   }
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java
index 648280a..c57c1cd 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.java
@@ -33,6 +33,7 @@ import java.util.regex.Pattern;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.environment.EnvironmentVariables;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistryWithSource;
@@ -75,7 +76,8 @@ public class MetricsSnapshotReporter implements MetricsReporter, Runnable {
   private final Optional<Pattern> blacklist;
   private final Clock clock;
 
-  private final String execEnvironmentContainerId;
+  private final String executionEnvContainerId;
+  private final String samzaEpochId;
   private final ScheduledExecutorService executor;
   private final long resetTime;
   private final List<MetricsRegistryWithSource> registries = new ArrayList<>();
@@ -97,8 +99,9 @@ public class MetricsSnapshotReporter implements MetricsReporter, Runnable {
     this.blacklist = blacklist;
     this.clock = clock;
 
-    this.execEnvironmentContainerId =
+    this.executionEnvContainerId =
         Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).orElse("");
+    this.samzaEpochId = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID)).orElse("");
     this.executor = Executors.newSingleThreadScheduledExecutor(
         new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build());
     this.resetTime = this.clock.currentTimeMillis();
@@ -193,8 +196,9 @@ public class MetricsSnapshotReporter implements MetricsReporter, Runnable {
       // publish to Kafka only if the metricsMsg carries any metrics
       if (!metricsMsg.isEmpty()) {
         MetricsHeader header =
-            new MetricsHeader(this.jobName, this.jobId, this.containerName, this.execEnvironmentContainerId, source,
-                this.version, this.samzaVersion, this.host, this.clock.currentTimeMillis(), this.resetTime);
+            new MetricsHeader(this.jobName, this.jobId, this.containerName, this.executionEnvContainerId,
+                Optional.of(this.samzaEpochId), source, this.version, this.samzaVersion, this.host,
+                this.clock.currentTimeMillis(), this.resetTime);
         Metrics metrics = new Metrics(metricsMsg);
         LOG.debug("Flushing metrics for {} to {} with header and map: header={}, map={}.", source, out,
             header.getAsMap(), metrics.getAsMap());
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 92eca0b..2994606 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -382,7 +382,8 @@ public class StreamProcessor {
     String jobName = new JobConfig(config).getName().get();
     String jobId = new JobConfig(config).getJobId();
     Optional<DiagnosticsManager> diagnosticsManager =
-        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, processorId, Optional.empty(), config);
+        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, processorId, Optional.empty(),
+            Optional.empty(), config);
 
     // Metadata store lifecycle managed outside of the SamzaContainer.
     // All manager lifecycles are managed in the SamzaContainer including startpointManager
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 89ec196..e9f4311 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -41,6 +41,7 @@ import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
+import org.apache.samza.environment.EnvironmentVariables;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -67,9 +68,12 @@ public class ContainerLaunchUtil {
    * Any change here needs to take Beam into account.
    */
   public static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,  String containerId, JobModel jobModel) {
-    Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+    Optional<String> executionEnvContainerId =
+        Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+    Optional<String> samzaEpochId = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID));
     JobConfig jobConfig = new JobConfig(jobModel.getConfig());
-    ContainerLaunchUtil.run(appDesc, jobConfig.getName().get(), jobConfig.getJobId(), containerId, execEnvContainerId, jobModel);
+    ContainerLaunchUtil.run(appDesc, jobConfig.getName().get(), jobConfig.getJobId(), containerId,
+        executionEnvContainerId, samzaEpochId, jobModel);
   }
 
   /**
@@ -77,7 +81,11 @@ public class ContainerLaunchUtil {
    */
   public static void run(
       ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
-      String jobName, String jobId, String containerId, Optional<String> execEnvContainerId,
+      String jobName,
+      String jobId,
+      String containerId,
+      Optional<String> executionEnvContainerId,
+      Optional<String> samzaEpochId,
       JobModel jobModel) {
     Config config = jobModel.getConfig();
 
@@ -87,8 +95,9 @@ public class ContainerLaunchUtil {
     MDC.put("jobId", jobId);
     LoggingContextHolder.INSTANCE.setConfig(jobModel.getConfig());
 
-    DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, execEnvContainerId, config);
-    run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel, config, buildExternalContext(config));
+    DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, executionEnvContainerId, config);
+    run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config,
+        buildExternalContext(config));
 
     System.exit(0);
   }
@@ -98,7 +107,8 @@ public class ContainerLaunchUtil {
       String jobName,
       String jobId,
       String containerId,
-      Optional<String> execEnvContainerIdOptional,
+      Optional<String> executionEnvContainerId,
+      Optional<String> samzaEpochId,
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
@@ -119,8 +129,8 @@ public class ContainerLaunchUtil {
 
       // Creating diagnostics manager and reporter, and wiring it respectively
       Optional<DiagnosticsManager> diagnosticsManager =
-          DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerIdOptional,
-              config);
+          DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, executionEnvContainerId,
+              samzaEpochId, config);
       MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
 
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
@@ -149,7 +159,7 @@ public class ContainerLaunchUtil {
       }
 
       if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
-        execEnvContainerIdOptional.ifPresent(execEnvContainerId -> {
+        executionEnvContainerId.ifPresent(execEnvContainerId -> {
           ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
               new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionEnvContainerIdMapping.TYPE));
           executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(containerId, execEnvContainerId);
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 087d8bd..1c82000 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -30,6 +30,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.environment.EnvironmentVariables;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.util.SamzaUncaughtExceptionHandler;
@@ -56,7 +57,9 @@ public class LocalContainerRunner {
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
     System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
-    Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+    Optional<String> executionEnvContainerId =
+        Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
+    Optional<String> samzaEpochId = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID));
 
     int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
     JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
@@ -75,7 +78,7 @@ public class LocalContainerRunner {
     ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
         ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
 
-    ContainerLaunchUtil.run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel);
+    ContainerLaunchUtil.run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId,
+        jobModel);
   }
-
-}
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index f141d92..7ef29ac 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -87,12 +87,13 @@ public class DiagnosticsUtil {
   }
 
   /**
-   * Create a pair of DiagnosticsManager and Reporter for the given jobName, jobId, containerId, and execEnvContainerId,
-   * if diagnostics is enabled.
-   * execEnvContainerId is the ID assigned to the container by the cluster manager (e.g., YARN).
+   * Create a {@link DiagnosticsManager} for the given jobName, jobId, containerId, and execEnvContainerId, if
+   * diagnostics is enabled.
+   * @param executionEnvContainerId ID assigned to the container by the cluster manager (e.g. YARN)
+   * @param samzaEpochId ID assigned to the job deployment attempt by the cluster manager
    */
-  public static Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
-      String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) {
+  public static Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName, String jobId, JobModel jobModel,
+      String containerId, Optional<String> executionEnvContainerId, Optional<String> samzaEpochId, Config config) {
 
     JobConfig jobConfig = new JobConfig(config);
     MetricsConfig metricsConfig = new MetricsConfig(config);
@@ -132,8 +133,8 @@ public class DiagnosticsUtil {
       DiagnosticsManager diagnosticsManager =
           new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), containerMemoryMb, containerNumCores,
               new StorageConfig(config).getNumPersistentStores(), maxHeapSizeBytes, containerThreadPoolSize,
-              containerId, execEnvContainerId.orElse(""), taskClassVersion, samzaVersion, hostName,
-              diagnosticsSystemStream, systemProducer,
+              containerId, executionEnvContainerId.orElse(""), samzaEpochId.orElse(""), taskClassVersion,
+              samzaVersion, hostName, diagnosticsSystemStream, systemProducer,
               Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled(), config);
 
       diagnosticsManagerOptional = Optional.of(diagnosticsManager);
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index 4389fcc..f420a93 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -25,6 +25,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -35,7 +36,9 @@ import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Clock;
 import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +59,7 @@ public class DiagnosticsManager {
   private final String jobId;
   private final String containerId;
   private final String executionEnvContainerId;
+  private final String samzaEpochId;
   private final String taskClassVersion;
   private final String samzaVersion;
   private final String hostname;
@@ -70,6 +74,7 @@ public class DiagnosticsManager {
   private final Map<String, ContainerModel> containerModels;
   private final boolean autosizingEnabled;
   private final Config config;
+  private final Clock clock;
   private boolean jobParamsEmitted = false;
 
   private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data
@@ -90,6 +95,7 @@ public class DiagnosticsManager {
       int containerThreadPoolSize,
       String containerId,
       String executionEnvContainerId,
+      String samzaEpochId,
       String taskClassVersion,
       String samzaVersion,
       String hostname,
@@ -99,10 +105,12 @@ public class DiagnosticsManager {
       boolean autosizingEnabled,
       Config config) {
 
-    this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize,
-        containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer,
-        terminationDuration, Executors.newSingleThreadScheduledExecutor(
-            new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled, config);
+    this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes,
+        containerThreadPoolSize, containerId, executionEnvContainerId, samzaEpochId, taskClassVersion,
+        samzaVersion, hostname, diagnosticSystemStream, systemProducer, terminationDuration,
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled,
+        config, SystemClock.instance());
   }
 
   @VisibleForTesting
@@ -116,6 +124,7 @@ public class DiagnosticsManager {
       int containerThreadPoolSize,
       String containerId,
       String executionEnvContainerId,
+      String samzaEpochId,
       String taskClassVersion,
       String samzaVersion,
       String hostname,
@@ -124,7 +133,8 @@ public class DiagnosticsManager {
       Duration terminationDuration,
       ScheduledExecutorService executorService,
       boolean autosizingEnabled,
-      Config config) {
+      Config config,
+      Clock clock) {
     this.jobName = jobName;
     this.jobId = jobId;
     this.containerModels = containerModels;
@@ -135,6 +145,7 @@ public class DiagnosticsManager {
     this.containerThreadPoolSize = containerThreadPoolSize;
     this.containerId = containerId;
     this.executionEnvContainerId = executionEnvContainerId;
+    this.samzaEpochId = samzaEpochId;
     this.taskClassVersion = taskClassVersion;
     this.samzaVersion = samzaVersion;
     this.hostname = hostname;
@@ -147,8 +158,9 @@ public class DiagnosticsManager {
     this.scheduler = executorService;
     this.autosizingEnabled = autosizingEnabled;
     this.config = config;
+    this.clock = clock;
 
-    resetTime = Instant.now();
+    this.resetTime = Instant.ofEpochMilli(this.clock.currentTimeMillis());
     this.systemProducer.register(getClass().getSimpleName());
 
     try {
@@ -199,13 +211,13 @@ public class DiagnosticsManager {
   }
 
   private class DiagnosticsStreamPublisher implements Runnable {
-
     @Override
     public void run() {
       try {
         DiagnosticsStreamMessage diagnosticsStreamMessage =
             new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId,
-                taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli());
+                Optional.of(samzaEpochId), taskClassVersion, samzaVersion, hostname,
+                clock.currentTimeMillis(), resetTime.toEpochMilli());
 
         // Add job-related params to the message (if not already published)
         if (!jobParamsEmitted) {
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
index ffe056e..9cc8d72 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.samza.config.Config;
@@ -68,13 +69,10 @@ public class DiagnosticsStreamMessage {
   private final Map<String, Map<String, Object>> metricsMessage;
 
   public DiagnosticsStreamMessage(String jobName, String jobId, String containerName, String executionEnvContainerId,
-      String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp) {
-
-    // Create the metricHeader
-    metricsHeader =
-        new MetricsHeader(jobName, jobId, containerName, executionEnvContainerId, DiagnosticsManager.class.getName(),
-            taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp);
-
+      Optional<String> samzaEpochId, String taskClassVersion, String samzaVersion, String hostname,
+      long timestamp, long resetTimestamp) {
+    this.metricsHeader = new MetricsHeader(jobName, jobId, containerName, executionEnvContainerId, samzaEpochId,
+        DiagnosticsManager.class.getName(), taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp);
     this.metricsMessage = new HashMap<>();
   }
 
@@ -254,9 +252,9 @@ public class DiagnosticsStreamMessage {
     DiagnosticsStreamMessage diagnosticsStreamMessage =
         new DiagnosticsStreamMessage(metricsSnapshot.getHeader().getJobName(), metricsSnapshot.getHeader().getJobId(),
             metricsSnapshot.getHeader().getContainerName(), metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
-            metricsSnapshot.getHeader().getVersion(), metricsSnapshot.getHeader().getSamzaVersion(),
-            metricsSnapshot.getHeader().getHost(), metricsSnapshot.getHeader().getTime(),
-            metricsSnapshot.getHeader().getResetTime());
+            metricsSnapshot.getHeader().getSamzaEpochId(), metricsSnapshot.getHeader().getVersion(),
+            metricsSnapshot.getHeader().getSamzaVersion(), metricsSnapshot.getHeader().getHost(),
+            metricsSnapshot.getHeader().getTime(), metricsSnapshot.getHeader().getResetTime());
 
     Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap();
     Map<String, Object> diagnosticsManagerGroupMap = metricsMap.get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER);
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
index 9386883..b55b490 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
@@ -87,6 +87,8 @@ public class TestStaticResourceJobCoordinator {
   private static final String JOB_NAME = "my-samza-job";
   private static final String JOB_ID = "123";
   private static final String PROCESSOR_ID = "samza-job-coordinator";
+  private static final Optional<String> EXECUTION_ENV_CONTAINER_ID = Optional.of("execution_container_123");
+  private static final Optional<String> SAMZA_EPOCH_ID = Optional.of("epoch_123");
   private static final SystemStream SYSTEM_STREAM = new SystemStream("system", "stream");
   private static final TaskName TASK_NAME = new TaskName("Partition " + 0);
   private static final Map<String, ContainerModel> CONTAINERS = ImmutableMap.of("0", new ContainerModel("0",
@@ -136,8 +138,8 @@ public class TestStaticResourceJobCoordinator {
     this.staticResourceJobCoordinator =
         spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext,
             this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory,
-            this.streamRegexMonitorFactory, this.startpointManager, this.changelogStreamManager, this.jobRestartSignal,
-            this.metrics, this.systemAdmins, this.config));
+            this.streamRegexMonitorFactory, Optional.of(this.startpointManager), this.changelogStreamManager,
+            this.jobRestartSignal, this.metrics, this.systemAdmins, EXECUTION_ENV_CONTAINER_ID, SAMZA_EPOCH_ID, this.config));
     this.staticResourceJobCoordinator.setListener(this.jobCoordinatorListener);
     doNothing().when(this.staticResourceJobCoordinator).doSetLoggingContextConfig(any());
   }
@@ -224,8 +226,8 @@ public class TestStaticResourceJobCoordinator {
     this.staticResourceJobCoordinator =
         spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext,
             this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory,
-            this.streamRegexMonitorFactory, null, this.changelogStreamManager, this.jobRestartSignal, this.metrics,
-            this.systemAdmins, this.config));
+            this.streamRegexMonitorFactory, Optional.empty(), this.changelogStreamManager, this.jobRestartSignal,
+            this.metrics, this.systemAdmins, Optional.empty(), Optional.empty(), this.config));
     Config jobModelConfig = mock(Config.class);
     JobModel jobModel = setupJobModel(jobModelConfig);
     StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig);
@@ -233,13 +235,15 @@ public class TestStaticResourceJobCoordinator {
     JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
         ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
     doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
-        .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
-            CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+        .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel, CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME,
+            Optional.empty(), Optional.empty(), this.config);
     MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
     this.staticResourceJobCoordinator.start();
     assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
     verify(this.systemAdmins).start();
     verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+    verify(this.staticResourceJobCoordinator).buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
+        CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), Optional.empty(), this.config);
     verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor, null,
         newMetadata, null);
     verifyZeroInteractions(this.jobCoordinatorListener, this.startpointManager);
@@ -275,8 +279,8 @@ public class TestStaticResourceJobCoordinator {
     this.staticResourceJobCoordinator =
         spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext,
             this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory,
-            this.streamRegexMonitorFactory, null, this.changelogStreamManager, this.jobRestartSignal, this.metrics,
-            this.systemAdmins, this.config));
+            this.streamRegexMonitorFactory, Optional.empty(), this.changelogStreamManager, this.jobRestartSignal,
+            this.metrics, this.systemAdmins, Optional.empty(), Optional.empty(), this.config));
 
     Config jobModelConfig = mock(Config.class);
     JobModel jobModel = setupJobModel(jobModelConfig);
@@ -285,8 +289,8 @@ public class TestStaticResourceJobCoordinator {
     setupJobCoordinatorMetadata(jobModel, jobModelConfig,
         ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
     doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
-        .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
-            CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+        .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel, CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME,
+            Optional.empty(), Optional.empty(), this.config);
     metadataResourceUtil(jobModel);
     // call start in order to set up monitors
     this.staticResourceJobCoordinator.start();
@@ -424,7 +428,8 @@ public class TestStaticResourceJobCoordinator {
   private void setUpDiagnosticsManager(JobModel expectedJobModel) {
     doReturn(Optional.of(this.diagnosticsManager)).when(this.staticResourceJobCoordinator)
         .buildDiagnosticsManager(JOB_NAME, JOB_ID, expectedJobModel,
-            CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+            CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, EXECUTION_ENV_CONTAINER_ID, SAMZA_EPOCH_ID,
+            this.config);
   }
 
   private void verifyStartLifecycle() {
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
index 8ff58eb..5ef6129 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -26,17 +26,20 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.reporter.MetricsHeader;
 import org.apache.samza.metrics.reporter.MetricsSnapshot;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Clock;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -48,29 +51,34 @@ public class TestDiagnosticsManager {
   private DiagnosticsManager diagnosticsManager;
   private MockSystemProducer mockSystemProducer;
   private ScheduledExecutorService mockExecutorService;
-  private SystemStream diagnosticsSystemStream = new SystemStream("kafka", "test stream");
-
-  private String jobName = "Testjob";
-  private String jobId = "test job id";
-  private String executionEnvContainerId = "exec container id";
-  private String taskClassVersion = "0.0.1";
-  private String samzaVersion = "1.3.0";
-  private String hostname = "sample host name";
-  private int containerMb = 1024;
-  private int containerThreadPoolSize = 2;
-  private long maxHeapSize = 900;
-  private int numPersistentStores = 2;
-  private int containerNumCores = 2;
-  private boolean autosizingEnabled = false;
-  private Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId,
+  private final SystemStream diagnosticsSystemStream = new SystemStream("kafka", "test stream");
+
+  private static final String JOB_NAME = "Testjob";
+  private static final String JOB_ID = "test job id";
+  private static final String EXECUTION_ENV_CONTAINER_ID = "exec container id";
+  private static final String SAMZA_EPOCH_ID = "epoch-123";
+  private static final String TASK_CLASS_VERSION = "0.0.1";
+  private static final String SAMZA_VERSION = "1.3.0";
+  private static final String HOSTNAME = "sample host name";
+  private static final int CONTAINER_MB = 1024;
+  private static final int CONTAINER_THREAD_POOL_SIZE = 2;
+  private static final long MAX_HEAP_SIZE = 900;
+  private static final int NUM_PERSISTENT_STORES = 2;
+  private static final int CONTAINER_NUM_CORES = 2;
+  private static final boolean AUTOSIZING_ENABLED = false;
+  private static final long RESET_TIME = 10;
+  private static final long FIRST_SEND_TIME = 20;
+  private static final long SECOND_SEND_TIME = 30;
+  private final Config config = new MapConfig(ImmutableMap.of("job.name", JOB_NAME, "job.id", JOB_ID,
       "cluster-manager.container.memory.mb", "1024", "cluster-manager.container. cpu.cores", "1",
       "cluster-manager.container.retry.count", "8"));
-  private Map<String, ContainerModel> containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels();
-  private Collection<DiagnosticsExceptionEvent> exceptionEventList = TestDiagnosticsStreamMessage.getExceptionList();
+  private final Map<String, ContainerModel> containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels();
+  private final Collection<DiagnosticsExceptionEvent> exceptionEventList =
+      TestDiagnosticsStreamMessage.getExceptionList();
+  private Clock clock;
 
   @Before
   public void setup() {
-
     // Mocked system producer for publishing to diagnostics stream
     mockSystemProducer = new MockSystemProducer();
 
@@ -83,25 +91,30 @@ public class TestDiagnosticsManager {
               .mock(ScheduledFuture.class);
         });
 
+    this.clock = Mockito.mock(Clock.class);
+    // first call is for getting reset time, then other calls are for timestamp when message is being published
+    Mockito.when(this.clock.currentTimeMillis()).thenReturn(RESET_TIME, FIRST_SEND_TIME, SECOND_SEND_TIME);
+
     this.diagnosticsManager =
-        new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize,
-            "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream,
-            mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled, config);
+        new DiagnosticsManager(JOB_NAME, JOB_ID, containerModels, CONTAINER_MB, CONTAINER_NUM_CORES,
+            NUM_PERSISTENT_STORES, MAX_HEAP_SIZE, CONTAINER_THREAD_POOL_SIZE, "0", EXECUTION_ENV_CONTAINER_ID,
+            SAMZA_EPOCH_ID, TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, diagnosticsSystemStream,
+            mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, AUTOSIZING_ENABLED, config, this.clock);
 
     exceptionEventList.forEach(
       diagnosticsExceptionEvent -> this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent));
 
-    this.diagnosticsManager.addProcessorStopEvent("0", executionEnvContainerId, hostname, 101);
+    this.diagnosticsManager.addProcessorStopEvent("0", EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 101);
   }
 
   @Test
   public void testDiagnosticsManagerStart() {
     SystemProducer mockSystemProducer = Mockito.mock(SystemProducer.class);
     DiagnosticsManager diagnosticsManager =
-        new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
-            maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
-            hostname, diagnosticsSystemStream, mockSystemProducer, Duration.ofSeconds(1), mockExecutorService,
-            autosizingEnabled, config);
+        new DiagnosticsManager(JOB_NAME, JOB_ID, containerModels, CONTAINER_MB, CONTAINER_NUM_CORES,
+            NUM_PERSISTENT_STORES, MAX_HEAP_SIZE, CONTAINER_THREAD_POOL_SIZE, "0", EXECUTION_ENV_CONTAINER_ID,
+            SAMZA_EPOCH_ID, TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, diagnosticsSystemStream,
+            mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, AUTOSIZING_ENABLED, config, this.clock);
 
     diagnosticsManager.start();
 
@@ -117,10 +130,10 @@ public class TestDiagnosticsManager {
     Mockito.when(mockExecutorService.isTerminated()).thenReturn(true);
     Duration terminationDuration = Duration.ofSeconds(1);
     DiagnosticsManager diagnosticsManager =
-        new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
-            maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
-            hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
-            autosizingEnabled, config);
+        new DiagnosticsManager(JOB_NAME, JOB_ID, containerModels, CONTAINER_MB, CONTAINER_NUM_CORES,
+            NUM_PERSISTENT_STORES, MAX_HEAP_SIZE, CONTAINER_THREAD_POOL_SIZE, "0", EXECUTION_ENV_CONTAINER_ID,
+            SAMZA_EPOCH_ID, TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, diagnosticsSystemStream,
+            mockSystemProducer, terminationDuration, mockExecutorService, AUTOSIZING_ENABLED, config, this.clock);
 
     diagnosticsManager.stop();
 
@@ -137,10 +150,10 @@ public class TestDiagnosticsManager {
     Mockito.when(mockExecutorService.isTerminated()).thenReturn(false);
     Duration terminationDuration = Duration.ofSeconds(1);
     DiagnosticsManager diagnosticsManager =
-        new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
-            maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
-            hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
-            autosizingEnabled, config);
+        new DiagnosticsManager(JOB_NAME, JOB_ID, containerModels, CONTAINER_MB, CONTAINER_NUM_CORES,
+            NUM_PERSISTENT_STORES, MAX_HEAP_SIZE, CONTAINER_THREAD_POOL_SIZE, "0", EXECUTION_ENV_CONTAINER_ID,
+            SAMZA_EPOCH_ID, TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, diagnosticsSystemStream,
+            mockSystemProducer, terminationDuration, mockExecutorService, AUTOSIZING_ENABLED, config, this.clock);
 
     diagnosticsManager.stop();
 
@@ -168,7 +181,7 @@ public class TestDiagnosticsManager {
 
     Assert.assertEquals("One message should have been published", 1, mockSystemProducer.getEnvelopeList().size());
     OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0);
-    validateMetricsHeader(outgoingMessageEnvelope);
+    validateMetricsHeader(outgoingMessageEnvelope, FIRST_SEND_TIME);
     validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
   }
 
@@ -176,19 +189,19 @@ public class TestDiagnosticsManager {
   public void testSecondPublishWithProcessorStopInSecondMessage() {
     // Across two successive run() invocations two messages should be published if stop events are added
     this.diagnosticsManager.start();
-    this.diagnosticsManager.addProcessorStopEvent("0", executionEnvContainerId, hostname, 102);
+    this.diagnosticsManager.addProcessorStopEvent("0", EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 102);
     this.diagnosticsManager.start();
 
     Assert.assertEquals("Two messages should have been published", 2, mockSystemProducer.getEnvelopeList().size());
 
     // Validate the first message
     OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0);
-    validateMetricsHeader(outgoingMessageEnvelope);
+    validateMetricsHeader(outgoingMessageEnvelope, FIRST_SEND_TIME);
     validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
 
     // Validate the second message's header
     outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1);
-    validateMetricsHeader(outgoingMessageEnvelope);
+    validateMetricsHeader(outgoingMessageEnvelope, SECOND_SEND_TIME);
 
     // Validate the second message's body (should be all empty except for the processor-stop-event)
     MetricsSnapshot metricsSnapshot =
@@ -199,7 +212,7 @@ public class TestDiagnosticsManager {
     Assert.assertNull(diagnosticsStreamMessage.getContainerMb());
     Assert.assertNull(diagnosticsStreamMessage.getExceptionEvents());
     Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(),
-        Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 102)));
+        Arrays.asList(new ProcessorStopEvent("0", EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 102)));
     Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
     Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
     Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores());
@@ -217,12 +230,12 @@ public class TestDiagnosticsManager {
 
     // Validate the first message
     OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0);
-    validateMetricsHeader(outgoingMessageEnvelope);
+    validateMetricsHeader(outgoingMessageEnvelope, FIRST_SEND_TIME);
     validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
 
     // Validate the second message's header
     outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1);
-    validateMetricsHeader(outgoingMessageEnvelope);
+    validateMetricsHeader(outgoingMessageEnvelope, SECOND_SEND_TIME);
 
     // Validate the second message's body (should be all empty except for the processor-stop-event)
     MetricsSnapshot metricsSnapshot =
@@ -243,22 +256,17 @@ public class TestDiagnosticsManager {
     this.diagnosticsManager.stop();
   }
 
-  private void validateMetricsHeader(OutgoingMessageEnvelope outgoingMessageEnvelope) {
+  private void validateMetricsHeader(OutgoingMessageEnvelope outgoingMessageEnvelope, long sendTime) {
     // Validate the outgoing message
 
     Assert.assertTrue(outgoingMessageEnvelope.getSystemStream().equals(diagnosticsSystemStream));
     MetricsSnapshot metricsSnapshot =
         new MetricsSnapshotSerdeV2().fromBytes((byte[]) outgoingMessageEnvelope.getMessage());
 
-    // Validate all header fields
-    Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName);
-    Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId);
-    Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(), executionEnvContainerId);
-    Assert.assertEquals(metricsSnapshot.getHeader().getVersion(), taskClassVersion);
-    Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion);
-    Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname);
-    Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName());
-
+    MetricsHeader expectedHeader = new MetricsHeader(JOB_NAME, JOB_ID, "samza-container-0", EXECUTION_ENV_CONTAINER_ID,
+        Optional.of(SAMZA_EPOCH_ID), DiagnosticsManager.class.getName(), TASK_CLASS_VERSION, SAMZA_VERSION,
+        HOSTNAME, sendTime, RESET_TIME);
+    Assert.assertEquals(expectedHeader, metricsSnapshot.getHeader());
   }
 
   private void validateOutgoingMessageEnvelope(OutgoingMessageEnvelope outgoingMessageEnvelope) {
@@ -269,15 +277,16 @@ public class TestDiagnosticsManager {
     DiagnosticsStreamMessage diagnosticsStreamMessage =
         DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
 
-    Assert.assertEquals(containerMb, diagnosticsStreamMessage.getContainerMb().intValue());
-    Assert.assertEquals(maxHeapSize, diagnosticsStreamMessage.getMaxHeapSize().longValue());
-    Assert.assertEquals(containerThreadPoolSize, diagnosticsStreamMessage.getContainerThreadPoolSize().intValue());
+    Assert.assertEquals(CONTAINER_MB, diagnosticsStreamMessage.getContainerMb().intValue());
+    Assert.assertEquals(MAX_HEAP_SIZE, diagnosticsStreamMessage.getMaxHeapSize().longValue());
+    Assert.assertEquals(CONTAINER_THREAD_POOL_SIZE, diagnosticsStreamMessage.getContainerThreadPoolSize().intValue());
     Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents());
-    Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 101)));
+    Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), Arrays.asList(new ProcessorStopEvent("0",
+        EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 101)));
     Assert.assertEquals(containerModels, diagnosticsStreamMessage.getContainerModels());
-    Assert.assertEquals(containerNumCores, diagnosticsStreamMessage.getContainerNumCores().intValue());
-    Assert.assertEquals(numPersistentStores, diagnosticsStreamMessage.getNumPersistentStores().intValue());
-    Assert.assertEquals(autosizingEnabled, diagnosticsStreamMessage.getAutosizingEnabled());
+    Assert.assertEquals(CONTAINER_NUM_CORES, diagnosticsStreamMessage.getContainerNumCores().intValue());
+    Assert.assertEquals(NUM_PERSISTENT_STORES, diagnosticsStreamMessage.getNumPersistentStores().intValue());
+    Assert.assertEquals(AUTOSIZING_ENABLED, diagnosticsStreamMessage.getAutosizingEnabled());
     Assert.assertEquals(config, diagnosticsStreamMessage.getConfig());
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
index 72b1b5f..459fd14 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
@@ -32,6 +33,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.reporter.MetricsHeader;
 import org.apache.samza.metrics.reporter.MetricsSnapshot;
 import org.apache.samza.system.SystemStreamPartition;
 import org.junit.Assert;
@@ -39,22 +41,22 @@ import org.junit.Test;
 
 
 public class TestDiagnosticsStreamMessage {
-
-  private final String jobName = "Testjob";
-  private final String jobId = "test job id";
-  private final String containerName = "sample container name";
-  private final String executionEnvContainerId = "exec container id";
-  private final String taskClassVersion = "0.0.1";
-  private final String samzaVersion = "1.3.0";
-  private final String hostname = "sample host name";
+  private static final String JOB_NAME = "Testjob";
+  private static final String JOB_ID = "test job id";
+  private static final String CONTAINER_NAME = "sample container name";
+  private static final String EXECUTION_ENV_CONTAINER_ID = "exec container id";
+  private static final String SAMZA_EPOCH_ID = "epoch-123";
+  private static final String TASK_CLASS_VERSION = "0.0.1";
+  private static final String SAMZA_VERSION = "1.3.0";
+  private static final String HOSTNAME = "sample host name";
   private final long timestamp = System.currentTimeMillis();
   private final long resetTimestamp = System.currentTimeMillis();
-  private final Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId));
+  private final Config config = new MapConfig(ImmutableMap.of("job.name", JOB_NAME, "job.id", JOB_ID));
 
-  private DiagnosticsStreamMessage getDiagnosticsStreamMessage() {
+  private DiagnosticsStreamMessage getDiagnosticsStreamMessage(Optional<String> samzaEpochId) {
     DiagnosticsStreamMessage diagnosticsStreamMessage =
-        new DiagnosticsStreamMessage(jobName, jobId, containerName, executionEnvContainerId, taskClassVersion,
-            samzaVersion, hostname, timestamp, resetTimestamp);
+        new DiagnosticsStreamMessage(JOB_NAME, JOB_ID, CONTAINER_NAME, EXECUTION_ENV_CONTAINER_ID, samzaEpochId,
+            TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, timestamp, resetTimestamp);
 
     diagnosticsStreamMessage.addContainerMb(1024);
     diagnosticsStreamMessage.addContainerNumCores(2);
@@ -66,10 +68,10 @@ public class TestDiagnosticsStreamMessage {
   }
 
   public static Collection<DiagnosticsExceptionEvent> getExceptionList() {
-    BoundedList boundedList = new BoundedList<DiagnosticsExceptionEvent>("exceptions");
+    BoundedList<DiagnosticsExceptionEvent> boundedList = new BoundedList<>("exceptions");
     DiagnosticsExceptionEvent diagnosticsExceptionEvent =
         new DiagnosticsExceptionEvent(1, new Exception("this is a samza exception", new Exception("cause")),
-            new HashMap());
+            new HashMap<>());
 
     boundedList.add(diagnosticsExceptionEvent);
     return boundedList.getValues();
@@ -77,7 +79,7 @@ public class TestDiagnosticsStreamMessage {
 
   public List<ProcessorStopEvent> getProcessorStopEventList() {
     List<ProcessorStopEvent> stopEventList = new ArrayList<>();
-    stopEventList.add(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 101));
+    stopEventList.add(new ProcessorStopEvent("0", EXECUTION_ENV_CONTAINER_ID, HOSTNAME, 101));
     return stopEventList;
   }
 
@@ -102,8 +104,8 @@ public class TestDiagnosticsStreamMessage {
    */
   @Test
   public void basicTest() {
-
-    DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage();
+    DiagnosticsStreamMessage diagnosticsStreamMessage =
+        getDiagnosticsStreamMessage(Optional.of(SAMZA_EPOCH_ID));
     Collection<DiagnosticsExceptionEvent> exceptionEventList = getExceptionList();
     diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
     diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
@@ -123,20 +125,18 @@ public class TestDiagnosticsStreamMessage {
    */
   @Test
   public void serdeTest() {
-    DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage();
+    DiagnosticsStreamMessage diagnosticsStreamMessage =
+        getDiagnosticsStreamMessage(Optional.of(SAMZA_EPOCH_ID));
     Collection<DiagnosticsExceptionEvent> exceptionEventList = getExceptionList();
     diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
     diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
     diagnosticsStreamMessage.addContainerModels(getSampleContainerModels());
 
     MetricsSnapshot metricsSnapshot = diagnosticsStreamMessage.convertToMetricsSnapshot();
-    Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName);
-    Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId);
-    Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(), executionEnvContainerId);
-    Assert.assertEquals(metricsSnapshot.getHeader().getVersion(), taskClassVersion);
-    Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion);
-    Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname);
-    Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName());
+    MetricsHeader expectedHeader = new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXECUTION_ENV_CONTAINER_ID,
+        Optional.of(SAMZA_EPOCH_ID), DiagnosticsManager.class.getName(), TASK_CLASS_VERSION, SAMZA_VERSION,
+        HOSTNAME, timestamp, resetTimestamp);
+    Assert.assertEquals(metricsSnapshot.getHeader(), expectedHeader);
 
     Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap();
     Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions"));
@@ -149,7 +149,20 @@ public class TestDiagnosticsStreamMessage {
 
     DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
         DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+    Assert.assertEquals(convertedDiagnosticsStreamMessage, diagnosticsStreamMessage);
+  }
 
-    Assert.assertTrue(convertedDiagnosticsStreamMessage.equals(diagnosticsStreamMessage));
+  @Test
+  public void testSerdeEmptySamzaEpochIdInHeader() {
+    DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage(Optional.empty());
+    MetricsSnapshot metricsSnapshot = diagnosticsStreamMessage.convertToMetricsSnapshot();
+    MetricsHeader expectedHeader =
+        new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXECUTION_ENV_CONTAINER_ID, Optional.empty(),
+            DiagnosticsManager.class.getName(), TASK_CLASS_VERSION, SAMZA_VERSION, HOSTNAME, timestamp, resetTimestamp);
+    Assert.assertEquals(metricsSnapshot.getHeader(), expectedHeader);
+
+    DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
+        DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+    Assert.assertEquals(convertedDiagnosticsStreamMessage, diagnosticsStreamMessage);
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java
index bbd6b30..23ee2a1 100644
--- a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsHeader.java
@@ -20,6 +20,7 @@ package org.apache.samza.metrics.reporter;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -30,6 +31,7 @@ public class TestMetricsHeader {
   private static final String JOB_ID = "id-a";
   private static final String CONTAINER_NAME = "samza-container-0";
   private static final String EXEC_ENV_CONTAINER_ID = "container-12345";
+  private static final String SAMZA_EPOCH_ID = "epoch-12345";
   private static final String SOURCE = "metrics-source";
   private static final String VERSION = "1.2.3";
   private static final String SAMZA_VERSION = "4.5.6";
@@ -40,13 +42,14 @@ public class TestMetricsHeader {
   @Test
   public void testGetAsMap() {
     MetricsHeader metricsHeader =
-        new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, SOURCE, VERSION, SAMZA_VERSION, HOST,
-            TIME, RESET_TIME);
+        new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.of(SAMZA_EPOCH_ID),
+            SOURCE, VERSION, SAMZA_VERSION, HOST, TIME, RESET_TIME);
     Map<String, Object> expected = new HashMap<>();
     expected.put("job-name", JOB_NAME);
     expected.put("job-id", JOB_ID);
     expected.put("container-name", CONTAINER_NAME);
     expected.put("exec-env-container-id", EXEC_ENV_CONTAINER_ID);
+    expected.put("samza-epoch-id", SAMZA_EPOCH_ID);
     expected.put("source", SOURCE);
     expected.put("version", VERSION);
     expected.put("samza-version", SAMZA_VERSION);
@@ -54,6 +57,13 @@ public class TestMetricsHeader {
     expected.put("time", TIME);
     expected.put("reset-time", RESET_TIME);
     assertEquals(expected, metricsHeader.getAsMap());
+
+    // test with empty samza epoch id
+    metricsHeader =
+        new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.empty(), SOURCE, VERSION,
+            SAMZA_VERSION, HOST, TIME, RESET_TIME);
+    expected.remove("samza-epoch-id");
+    assertEquals(expected, metricsHeader.getAsMap());
   }
 
   @Test
@@ -63,6 +73,7 @@ public class TestMetricsHeader {
     map.put("job-id", JOB_ID);
     map.put("container-name", CONTAINER_NAME);
     map.put("exec-env-container-id", EXEC_ENV_CONTAINER_ID);
+    map.put("samza-epoch-id", SAMZA_EPOCH_ID);
     map.put("source", SOURCE);
     map.put("version", VERSION);
     map.put("samza-version", SAMZA_VERSION);
@@ -70,8 +81,15 @@ public class TestMetricsHeader {
     map.put("time", TIME);
     map.put("reset-time", RESET_TIME);
     MetricsHeader expected =
-        new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, SOURCE, VERSION, SAMZA_VERSION, HOST,
-            TIME, RESET_TIME);
+        new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.of(SAMZA_EPOCH_ID),
+            SOURCE, VERSION, SAMZA_VERSION, HOST, TIME, RESET_TIME);
+    assertEquals(expected, MetricsHeader.fromMap(map));
+
+    // test with missing samza epoch id
+    map.remove("samza-epoch-id");
+    expected =
+        new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.empty(), SOURCE, VERSION,
+            SAMZA_VERSION, HOST, TIME, RESET_TIME);
     assertEquals(expected, MetricsHeader.fromMap(map));
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java
index 9b5c693..43e3cf8 100644
--- a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshot.java
@@ -19,6 +19,7 @@
 package org.apache.samza.metrics.reporter;
 
 import java.util.Map;
+import java.util.Optional;
 import com.google.common.collect.ImmutableMap;
 import org.junit.Test;
 
@@ -27,7 +28,8 @@ import static org.junit.Assert.assertEquals;
 
 public class TestMetricsSnapshot {
   private static final MetricsHeader METRICS_HEADER =
-      new MetricsHeader("job", "id", "container", "container-id", "source", "1.2", "3.4", "a.b.c", 100, 10);
+      new MetricsHeader("job", "id", "container", "container-id", Optional.of("epoch-123"), "source", "1.2", "3.4",
+          "a.b.c", 100, 10);
   private static final Metrics METRICS =
       new Metrics(ImmutableMap.of("group0", ImmutableMap.of("a", "b"), "group1", ImmutableMap.of("c", "d")));
 
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java
index c761486..09a72b6 100644
--- a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestMetricsSnapshotReporter.java
@@ -30,11 +30,12 @@ import org.apache.samza.serializers.Serializer;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Clock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -58,13 +59,18 @@ public class TestMetricsSnapshotReporter {
   private static final String SAMZA_VERSION = "test samza version";
   private static final String HOSTNAME = "test host";
   private static final Duration REPORTING_INTERVAL = Duration.ofSeconds(60000);
+  private static final long RESET_TIME = 10;
+  private static final long SEND_TIME = 20;
 
   private Serializer<MetricsSnapshot> serializer;
   private SystemProducer producer;
+  private Clock clock;
 
   @Before
   public void setup() {
     producer = mock(SystemProducer.class);
+    clock = mock(Clock.class);
+    Mockito.when(clock.currentTimeMillis()).thenReturn(RESET_TIME, SEND_TIME);
     serializer = new MetricsSnapshotSerdeV2();
   }
 
@@ -162,13 +168,10 @@ public class TestMetricsSnapshotReporter {
 
     MetricsSnapshot metricsSnapshot = (MetricsSnapshot) envelopes.get(0).getMessage();
 
-    Assert.assertEquals(JOB_NAME, metricsSnapshot.getHeader().getJobName());
-    Assert.assertEquals(JOB_ID, metricsSnapshot.getHeader().getJobId());
-    Assert.assertEquals(CONTAINER_NAME, metricsSnapshot.getHeader().getContainerName());
-    Assert.assertEquals(source, metricsSnapshot.getHeader().getSource());
-    Assert.assertEquals(SAMZA_VERSION, metricsSnapshot.getHeader().getSamzaVersion());
-    Assert.assertEquals(TASK_VERSION, metricsSnapshot.getHeader().getVersion());
-    Assert.assertEquals(HOSTNAME, metricsSnapshot.getHeader().getHost());
+    MetricsHeader expectedHeader =
+        new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, "", Optional.of(""), source, TASK_VERSION, SAMZA_VERSION,
+            HOSTNAME, SEND_TIME, RESET_TIME);
+    Assert.assertEquals(expectedHeader, metricsSnapshot.getHeader());
 
     Map<String, Map<String, Object>> metricMap = metricsSnapshot.getMetrics().getAsMap();
     Assert.assertEquals(1, metricMap.size());
@@ -179,6 +182,6 @@ public class TestMetricsSnapshotReporter {
 
   private MetricsSnapshotReporter getMetricsSnapshotReporter(Optional<String> blacklist) {
     return new MetricsSnapshotReporter(producer, SYSTEM_STREAM, REPORTING_INTERVAL, JOB_NAME, JOB_ID, CONTAINER_NAME,
-        TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer, blacklist.map(Pattern::compile), SystemClock.instance());
+        TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer, blacklist.map(Pattern::compile), this.clock);
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java b/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java
index b75cbde..35556e9 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/TestMetricsSnapshotSerde.java
@@ -21,61 +21,88 @@ package org.apache.samza.serializers;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.samza.metrics.reporter.Metrics;
 import org.apache.samza.metrics.reporter.MetricsHeader;
 import org.apache.samza.metrics.reporter.MetricsSnapshot;
 import org.junit.Test;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 
 public class TestMetricsSnapshotSerde {
-  private static final String SERIALIZED =
-      "{\"header\":{\"job-id\":\"testjobid\",\"exec-env-container-id\":\"test exec env container id\",\"samza-version\":\"samzaversion\",\"job-name\":\"test-jobName\",\"host\":\"host\",\"reset-time\":2,\"container-name\":\"samza-container-0\",\"source\":\"test source\",\"time\":1,\"version\":\"version\"},\"metrics\":{\"test\":{\"test2\":\"foo\"}}}";
+  @Test
+  public void testSerializeThenDeserialize() {
+    MetricsSnapshot snapshot = metricsSnapshot(true);
+    MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
+    byte[] bytes = serde.toBytes(snapshot);
+    assertEquals(snapshot, serde.fromBytes(bytes));
+  }
 
   @Test
-  public void testMetricsSerdeSerializeAndDeserialize() {
-    Map<String, Object> metricsMap = new HashMap<>();
-    metricsMap.put("test2", "foo");
-    Map<String, Map<String, Object>> metricsGroupMap = new HashMap<>();
-    metricsGroupMap.put("test", metricsMap);
-    MetricsSnapshot snapshot = new MetricsSnapshot(metricsHeader(), Metrics.fromMap(metricsGroupMap));
+  public void testSerializeThenDeserializeEmptySamzaEpochIdInHeader() {
+    MetricsSnapshot snapshot = metricsSnapshot(true);
     MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
     byte[] bytes = serde.toBytes(snapshot);
     assertEquals(snapshot, serde.fromBytes(bytes));
   }
 
   /**
-   * Helps for testing compatibility against older versions of code.
+   * Helps for verifying compatibility when schemas evolve.
+   *
+   * Maps have non-deterministic ordering when serialized, so it is difficult to check exact serialized results. It
+   * isn't really necessary to check the serialized results anyways. We just need to make sure serialized data can be
+   * read by old and new systems.
    */
   @Test
-  public void testMetricsSerdeSerialize() {
-    Map<String, Object> metricsMap = new HashMap<>();
-    metricsMap.put("test2", "foo");
-    Map<String, Map<String, Object>> metricsGroupMap = new HashMap<>();
-    metricsGroupMap.put("test", metricsMap);
-    MetricsSnapshot snapshot = new MetricsSnapshot(metricsHeader(), Metrics.fromMap(metricsGroupMap));
+  public void testDeserializeRaw() {
+    MetricsSnapshot snapshot = metricsSnapshot(true);
     MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
-    assertArrayEquals(SERIALIZED.getBytes(StandardCharsets.UTF_8), serde.toBytes(snapshot));
+    assertEquals(snapshot, serde.fromBytes(expectedSeralizedSnapshot(true, false).getBytes(StandardCharsets.UTF_8)));
+    assertEquals(snapshot, serde.fromBytes(expectedSeralizedSnapshot(true, true).getBytes(StandardCharsets.UTF_8)));
   }
 
   /**
-   * Helps for testing compatibility against older versions of code.
+   * Helps for verifying compatibility when schemas evolve.
    */
   @Test
-  public void testMetricsSerdeDeserialize() {
+  public void testDeserializeRawEmptySamzaEpochIdInHeader() {
+    MetricsSnapshot snapshot = metricsSnapshot(false);
+    MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
+    assertEquals(snapshot, serde.fromBytes(expectedSeralizedSnapshot(false, false).getBytes(StandardCharsets.UTF_8)));
+    assertEquals(snapshot, serde.fromBytes(expectedSeralizedSnapshot(false, true).getBytes(StandardCharsets.UTF_8)));
+  }
+
+  private static String expectedSeralizedSnapshot(boolean includeSamzaEpochId,
+      boolean includeExtraHeaderField) {
+    String serializedSnapshot =
+        "{\"header\":{\"job-id\":\"testjobid\",\"exec-env-container-id\":\"test exec env container id\",";
+    if (includeSamzaEpochId) {
+      serializedSnapshot += "\"samza-epoch-id\":\"epoch-123\",";
+    }
+    if (includeExtraHeaderField) {
+      serializedSnapshot += "\"extra-header-field\":\"extra header value\",";
+    }
+    serializedSnapshot +=
+        "\"samza-version\":\"samzaversion\",\"job-name\":\"test-jobName\",\"host\":\"host\",\"reset-time\":2,"
+            + "\"container-name\":\"samza-container-0\",\"source\":\"test source\",\"time\":1,\"version\":\"version\"},"
+            + "\"metrics\":{\"test\":{\"test2\":\"foo\"}}}";
+    return serializedSnapshot;
+  }
+
+  private static MetricsSnapshot metricsSnapshot(boolean includeSamzaEpochId) {
+    MetricsHeader metricsHeader;
+    if (includeSamzaEpochId) {
+      metricsHeader = new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id",
+          Optional.of("epoch-123"), "test source", "version", "samzaversion", "host", 1L, 2L);
+    } else {
+      metricsHeader = new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id",
+          "test source", "version", "samzaversion", "host", 1L, 2L);
+    }
     Map<String, Object> metricsMap = new HashMap<>();
     metricsMap.put("test2", "foo");
     Map<String, Map<String, Object>> metricsGroupMap = new HashMap<>();
     metricsGroupMap.put("test", metricsMap);
-    MetricsSnapshot snapshot = new MetricsSnapshot(metricsHeader(), Metrics.fromMap(metricsGroupMap));
-    MetricsSnapshotSerde serde = new MetricsSnapshotSerde();
-    assertEquals(snapshot, serde.fromBytes(SERIALIZED.getBytes(StandardCharsets.UTF_8)));
-  }
-
-  private static MetricsHeader metricsHeader() {
-    return new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id",
-        "test source", "version", "samzaversion", "host", 1L, 2L);
+    return new MetricsSnapshot(metricsHeader, Metrics.fromMap(metricsGroupMap));
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
index 6a3dc21..f0b8ab5 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
@@ -22,6 +22,7 @@ package org.apache.samza.serializers.model.serializers;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.diagnostics.BoundedList;
@@ -32,15 +33,14 @@ import org.apache.samza.metrics.reporter.MetricsSnapshot;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.junit.Test;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 
 public class TestMetricsSnapshotSerdeV2 {
   @Test
-  public void testSerializeAndDeserialize() {
+  public void testSerializeThenDeserialize() {
     SamzaException samzaException = new SamzaException("this is a samza exception", new RuntimeException("cause"));
-    MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException);
+    MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException, true);
     MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
     byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot);
     MetricsSnapshot deserializedMetricsSnapshot = metricsSnapshotSerde.fromBytes(serializedBytes);
@@ -48,34 +48,64 @@ public class TestMetricsSnapshotSerdeV2 {
   }
 
   @Test
-  public void testSerialize() {
+  public void testSerializeThenDeserializeEmptySamzaEpochIdInHeader() {
     SamzaException samzaException = new SamzaException("this is a samza exception", new RuntimeException("cause"));
-    MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException);
+    MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException, false);
     MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
     byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot);
-    assertArrayEquals(expectedSeralizedSnapshot(samzaException).getBytes(StandardCharsets.UTF_8), serializedBytes);
+    MetricsSnapshot deserializedMetricsSnapshot = metricsSnapshotSerde.fromBytes(serializedBytes);
+    assertEquals(metricsSnapshot, deserializedMetricsSnapshot);
+  }
+
+  /**
+   * Helps for verifying compatibility when schemas evolve.
+   *
+   * Maps have non-deterministic ordering when serialized, so it is difficult to check exact serialized results. It
+   * isn't really necessary to check the serialized results anyways. We just need to make sure serialized data can be
+   * read by old and new systems.
+   */
+  @Test
+  public void testDeserializeRaw() {
+    SamzaException samzaException = new SamzaException("this is a samza exception", new RuntimeException("cause"));
+    MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException, true);
+    MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
+    assertEquals(metricsSnapshot, metricsSnapshotSerde.fromBytes(
+        expectedSeralizedSnapshot(samzaException, true, false).getBytes(StandardCharsets.UTF_8)));
+    assertEquals(metricsSnapshot, metricsSnapshotSerde.fromBytes(
+        expectedSeralizedSnapshot(samzaException, true, true).getBytes(StandardCharsets.UTF_8)));
   }
 
+  /**
+   * Helps for verifying compatibility when schemas evolve.
+   */
   @Test
-  public void testDeserialize() {
+  public void testDeserializeRawEmptySamzaEpochIdInHeader() {
     SamzaException samzaException = new SamzaException("this is a samza exception", new RuntimeException("cause"));
-    MetricsSnapshot expectedSnapshot = metricsSnapshot(samzaException);
+    MetricsSnapshot metricsSnapshot = metricsSnapshot(samzaException, false);
     MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
-    MetricsSnapshot deserializedSnapshot =
-        metricsSnapshotSerde.fromBytes(expectedSeralizedSnapshot(samzaException).getBytes(StandardCharsets.UTF_8));
-    assertEquals(expectedSnapshot, deserializedSnapshot);
+    assertEquals(metricsSnapshot, metricsSnapshotSerde.fromBytes(
+        expectedSeralizedSnapshot(samzaException, false, false).getBytes(StandardCharsets.UTF_8)));
+    assertEquals(metricsSnapshot, metricsSnapshotSerde.fromBytes(
+        expectedSeralizedSnapshot(samzaException, false, true).getBytes(StandardCharsets.UTF_8)));
   }
 
-  private static MetricsSnapshot metricsSnapshot(Exception exception) {
-    MetricsHeader metricsHeader =
-        new MetricsHeader("jobName", "i001", "container 0", "test container ID", "source", "300.14.25.1", "1", "1", 1,
-            1);
+  private static MetricsSnapshot metricsSnapshot(Exception exception, boolean includeSamzaEpochId) {
+    MetricsHeader metricsHeader;
+    if (includeSamzaEpochId) {
+      metricsHeader =
+          new MetricsHeader("jobName", "i001", "container 0", "test container ID", Optional.of("epoch-123"),
+              "source", "300.14.25.1", "1", "1", 1, 1);
+    } else {
+      metricsHeader =
+          new MetricsHeader("jobName", "i001", "container 0", "test container ID", "source", "300.14.25.1", "1", "1", 1,
+              1);
+    }
     BoundedList<DiagnosticsExceptionEvent> boundedList = new BoundedList<>("exceptions");
     DiagnosticsExceptionEvent diagnosticsExceptionEvent = new DiagnosticsExceptionEvent(1, exception, new HashMap<>());
     boundedList.add(diagnosticsExceptionEvent);
     Map<String, Map<String, Object>> metricMessage = new HashMap<>();
     Map<String, Object> samzaContainerMetrics = new HashMap<>();
-    samzaContainerMetrics.put("commit-calls", 0);
+    samzaContainerMetrics.put("commit-calls", 1);
     metricMessage.put("org.apache.samza.container.SamzaContainerMetrics", samzaContainerMetrics);
     Map<String, Object> exceptions = new HashMap<>();
     exceptions.put("exceptions", boundedList.getValues());
@@ -83,13 +113,34 @@ public class TestMetricsSnapshotSerdeV2 {
     return new MetricsSnapshot(metricsHeader, new Metrics(metricMessage));
   }
 
-  private static String expectedSeralizedSnapshot(Exception exception) {
+  /**
+   * @param includeSamzaEpochId include the new samza-epoch-id field (for testing that new code can read old data
+   *                           without this field)
+   * @param includeExtraHeaderField include an extra new field (for testing that old code can read new data with extra
+   *                                fields)
+   */
+  private static String expectedSeralizedSnapshot(Exception exception, boolean includeSamzaEpochId,
+      boolean includeExtraHeaderField) {
     String stackTrace = ExceptionUtils.getStackTrace(exception);
+    String serializedSnapshot =
+        "{\"header\":[\"java.util.HashMap\",{\"job-id\":\"i001\",\"exec-env-container-id\":\"test container ID\",";
+    if (includeSamzaEpochId) {
+      serializedSnapshot += "\"samza-epoch-id\":\"epoch-123\",";
+    }
+    if (includeExtraHeaderField) {
+      serializedSnapshot += "\"extra-header-field\":\"extra header value\",";
+    }
     // in serialized string, backslash in whitespace characters (e.g. \n, \t) are escaped
     String escapedStackTrace = stackTrace.replace("\n", "\\n").replace("\t", "\\t");
-    return
-        "{\"header\":[\"java.util.HashMap\",{\"job-id\":\"i001\",\"exec-env-container-id\":\"test container ID\",\"samza-version\":\"1\",\"job-name\":\"jobName\",\"host\":\"1\",\"reset-time\":[\"java.lang.Long\",1],\"container-name\":\"container 0\",\"source\":\"source\",\"time\":[\"java.lang.Long\",1],\"version\":\"300.14.25.1\"}],\"metrics\":[\"java.util.HashMap\",{\"org.apache.samza.exceptions\":[\"java.util.HashMap\",{\"exceptions\":[\"java.util.Collections$UnmodifiableRandomAccessLi [...]
-            + escapedStackTrace
-            + "\",\"mdcMap\":[\"java.util.HashMap\",{}]}]]]}],\"org.apache.samza.container.SamzaContainerMetrics\":[\"java.util.HashMap\",{\"commit-calls\":0}]}]}";
+    serializedSnapshot +=
+        "\"samza-version\":\"1\",\"job-name\":\"jobName\",\"host\":\"1\",\"reset-time\":[\"java.lang.Long\",1],"
+            + "\"container-name\":\"container 0\",\"source\":\"source\",\"time\":[\"java.lang.Long\",1],\"version\":\"300.14.25.1\"}],"
+            + "\"metrics\":[\"java.util.HashMap\",{\"org.apache.samza.exceptions\":"
+            + "[\"java.util.HashMap\",{\"exceptions\":[\"java.util.Collections$UnmodifiableRandomAccessList\","
+            + "[[\"org.apache.samza.diagnostics.DiagnosticsExceptionEvent\",{\"timestamp\":1,\"exceptionType\":\"org.apache.samza.SamzaException\","
+            + "\"exceptionMessage\":\"this is a samza exception\",\"compactExceptionStackTrace\":\"" + escapedStackTrace
+            + "\",\"mdcMap\":[\"java.util.HashMap\",{}]}]]]}],\"org.apache.samza.container.SamzaContainerMetrics\":"
+            + "[\"java.util.HashMap\",{\"commit-calls\":1}]}]}";
+    return serializedSnapshot;
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
index e10c551..09bca34 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
@@ -50,6 +50,7 @@ public class TestDiagnosticsUtil {
   private static final String JOB_ID = "someId";
   private static final String CONTAINER_ID = "someContainerId";
   private static final String ENV_ID = "someEnvID";
+  private static final String SAMZA_EPOCH_ID = "someEpochID";
   public static final String REPORTER_FACTORY = "org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory";
   public static final String SYSTEM_FACTORY = "com.foo.system.SomeSystemFactory";
 
@@ -65,7 +66,7 @@ public class TestDiagnosticsUtil {
 
     Optional<DiagnosticsManager> diagnosticsManager =
         DiagnosticsUtil.buildDiagnosticsManager(JOB_NAME, JOB_ID, mockJobModel, CONTAINER_ID, Optional.of(ENV_ID),
-            config);
+            Optional.of(SAMZA_EPOCH_ID), config);
 
     Assert.assertTrue(diagnosticsManager.isPresent());
   }