You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/05/23 23:41:38 UTC

[samza] branch master updated: SAMZA-2213: Adding writing of container.metadata file, and moving exception writing logic to DiagnosticsManager

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 66f29e8  SAMZA-2213: Adding writing of container.metadata file, and moving exception writing logic to DiagnosticsManager
66f29e8 is described below

commit 66f29e8287ae0f673a75b65db9c10db8e58819dc
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Thu May 23 16:41:30 2019 -0700

    SAMZA-2213: Adding writing of container.metadata file, and moving exception writing logic to DiagnosticsManager
    
    This PR
    * Removes ListGauge from everywhere.
    * Adds logic to LocalContainerRunner to write metadata, that is useful when combined with java's -XX:OnOutOfMemoryError=string option.
    
    * Adds DiagnosticsManager to publish error info to diagnostics stream instead of metrics reporter.
    * Adds logic to SamzaContainer to write the DiagnosticsManager while re-using the SystemProducer from the MetricsReporter.
    
    * Cleansup ContainerProcessManagerMetrics.
    
    * Adds DiagnosticsManager to ContainerProcessManager and the ClusterBasedJC
    
    Author: Ray Matharu <rm...@linkedin.com>
    Author: rmatharu <40...@users.noreply.github.com>
    
    Reviewers: Prateek Maheshwari <pm...@apache.org>
    
    Closes #1021 from rmatharu/job-metadata
---
 .../versioned/operations/monitoring.md             |   2 +-
 .../org/apache/samza/metrics/MetricsRegistry.java  |   9 --
 .../org/apache/samza/metrics/MetricsVisitor.java   |   6 +-
 .../metrics/ReadableMetricsRegistryListener.java   |   2 -
 .../org/apache/samza/util/NoOpMetricsRegistry.java |   6 -
 .../samza/system/eventhub/TestMetricsRegistry.java |   9 --
 .../clustermanager/ClusterBasedJobCoordinator.java |  10 +-
 .../clustermanager/ContainerProcessManager.java    | 109 +++++++++----
 .../java/org/apache/samza/metrics/MetricGroup.java |   4 -
 .../apache/samza/processor/StreamProcessor.java    |  20 ++-
 .../apache/samza/runtime/ContainerLaunchUtil.java  |  35 ++--
 .../apache/samza/runtime/LocalContainerRunner.java |  12 +-
 .../org/apache/samza/util/DiagnosticsUtil.java     | 126 +++++++++++++++
 .../apache/samza/util/MetricsReporterLoader.java   |  12 +-
 .../scala/org/apache/samza/config/JobConfig.scala  |  18 ++-
 .../org/apache/samza/config/MetricsConfig.scala    |   2 +-
 .../scala/org/apache/samza/config/TaskConfig.scala |   5 -
 .../apache/samza/container/SamzaContainer.scala    |  44 +++---
 .../samza/container/SamzaContainerMetrics.scala    |   2 -
 .../org/apache/samza/diagnostics/BoundedList.java  |  39 +++--
 .../diagnostics/DiagnosticsExceptionEvent.java     |   2 -
 .../samza/diagnostics/DiagnosticsManager.java      | 176 +++++++++++++++++++++
 .../metrics/ContainerProcessManagerMetrics.scala   |  89 ++++-------
 .../org/apache/samza/metrics/MetricsHelper.scala   |   2 -
 .../apache/samza/metrics/MetricsRegistryMap.scala  |  15 --
 .../samza/metrics/reporter/JmxReporter.scala       |  15 --
 .../metrics/reporter/MetricsSnapshotReporter.scala |   2 -
 .../reporter/MetricsSnapshotReporterFactory.scala  |  23 +--
 .../scala/org/apache/samza/util/FileUtil.scala     |  47 +++++-
 .../main/scala/org/apache/samza/util/Util.scala    |  29 +++-
 .../TestContainerProcessManager.java               |  40 ++---
 .../serializers/TestMetricsSnapshotSerdeV2.java    |   8 +-
 .../org/apache/samza/metrics/TestBoundedList.java  |  37 ++---
 .../logging/log4j/SimpleDiagnosticsAppender.java   |  14 +-
 .../logging/log4j2/SimpleDiagnosticsAppender.java  |  14 +-
 .../samza/sql/util/TestMetricsRegistryImpl.java    |   8 -
 .../java/org/apache/samza/config/YarnConfig.java   |   2 +-
 .../webapp/ApplicationMasterRestServlet.scala      |   3 -
 38 files changed, 675 insertions(+), 323 deletions(-)

diff --git a/docs/learn/documentation/versioned/operations/monitoring.md b/docs/learn/documentation/versioned/operations/monitoring.md
index ec8430c..bbad335 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -223,7 +223,7 @@ _Gauges_ are useful when measuring the magnitude of a certain system property, e
 
 _Counters_ are useful in measuring metrics that are cumulative values, e.g., the number of messages processed since container startup. Certain counters are also useful when visualized with their rate-of-change, e.g., the rate of message processing.
 
-_Timers_ are useful for storing and reporting a sliding-window of timing values. Samza also supports a ListGauge type metric, which can be used to store and report a list of any primitive-type such as strings.
+_Timers_ are useful for storing and reporting a sliding-window of timing values. 
 
 ## <a name="userdefinedmetrics"></a> C. Adding User-Defined Metrics
 
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
index fa0fd39..5a00d01 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
@@ -65,15 +65,6 @@ public interface MetricsRegistry {
   <T> Gauge<T> newGauge(String group, Gauge<T> value);
 
   /**
-   * Register a {@link org.apache.samza.metrics.ListGauge}
-   * @param group Group for this ListGauge
-   * @param listGauge the ListGauge to register
-   * @param <T> Type of the ListGauge
-   * @return ListGauge registered
-   */
-  <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge);
-
-  /**
    * Create and Register a new {@link org.apache.samza.metrics.Timer}
    * @param group Group for this Timer
    * @param name Name of to-be-created Timer
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
index 49a0929..40ebb3b 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
@@ -31,13 +31,9 @@ public abstract class MetricsVisitor {
 
   public abstract void timer(Timer timer);
 
-  public abstract <T> void listGauge(ListGauge<T> listGauge);
 
   public void visit(Metric metric) {
-    // Cast for metrics of type ListGauge
-    if (metric instanceof ListGauge<?>) {
-      listGauge((ListGauge<?>) metric);
-    } else if (metric instanceof Counter) {
+    if (metric instanceof Counter) {
       counter((Counter) metric);
     } else if (metric instanceof Gauge<?>) {
       gauge((Gauge<?>) metric);
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
index ba5b182..739d68f 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
@@ -24,7 +24,5 @@ public interface ReadableMetricsRegistryListener {
 
   void onGauge(String group, Gauge<?> gauge);
 
-  void onListGauge(String group, ListGauge<?> listGauge);
-
   void onTimer(String group, Timer timer);
 }
diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
index 76b8216..068fe72 100644
--- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
@@ -21,7 +21,6 @@ package org.apache.samza.util;
 
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.ListGauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 
@@ -52,11 +51,6 @@ public class NoOpMetricsRegistry implements MetricsRegistry {
   }
 
   @Override
-  public <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge) {
-    return listGauge;
-  }
-
-  @Override
   public Timer newTimer(String group, String name) {
     return new Timer(name);
   }
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
index 01b69ed..48f6edd 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import org.apache.commons.collections4.map.HashedMap;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.ListGauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 
@@ -33,7 +32,6 @@ public class TestMetricsRegistry implements MetricsRegistry {
 
   private Map<String, List<Counter>> counters = new HashedMap<>();
   private Map<String, List<Gauge<?>>> gauges = new HashedMap<>();
-  private Map<String, List<ListGauge>> listGauges = new HashedMap<>();
 
   public List<Counter> getCounters(String groupName) {
     return counters.get(groupName);
@@ -70,13 +68,6 @@ public class TestMetricsRegistry implements MetricsRegistry {
   }
 
   @Override
-  public ListGauge newListGauge(String group, ListGauge listGauge) {
-    listGauges.putIfAbsent(group, new ArrayList());
-    listGauges.get(group).add(listGauge);
-    return listGauge;
-  }
-
-  @Override
   public <T> Gauge<T> newGauge(String group, Gauge<T> value) {
     if (!gauges.containsKey(group)) {
       gauges.put(group, new ArrayList<>());
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 782a62a..9044361 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -57,6 +57,7 @@ import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.CoordinatorStreamUtil;
+import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,6 +91,7 @@ import scala.collection.JavaConverters;
 public class ClusterBasedJobCoordinator {
 
   private static final Logger LOG = LoggerFactory.getLogger(ClusterBasedJobCoordinator.class);
+  private final static String METRICS_SOURCE_NAME = "ApplicationMaster";
 
   private final Config config;
   private final ClusterManagerConfig clusterManagerConfig;
@@ -221,7 +223,13 @@ public class ClusterBasedJobCoordinator {
       // initialize JobCoordinator state
       LOG.info("Starting cluster based job coordinator");
 
-      // create necessary checkpoint and changelog streams, if not created
+      // write the diagnostics metadata file
+      String jobName = new JobConfig(config).getName().get();
+      String jobId = new JobConfig(config).getJobId();
+      Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv("CONTAINER_ID"));
+      DiagnosticsUtil.writeMetadataFile(jobName, jobId, METRICS_SOURCE_NAME, execEnvContainerId, config);
+
+      //create necessary checkpoint and changelog streams, if not created
       JobModel jobModel = jobModelManager.jobModel();
       MetadataResourceUtil metadataResourceUtil =
           new MetadataResourceUtil(jobModel, metrics);
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 70b5579..5f48df5 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -19,20 +19,29 @@
 package org.apache.samza.clustermanager;
 
 import java.util.Optional;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.metrics.ContainerProcessManagerMetrics;
+import org.apache.samza.metrics.JvmMetrics;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
+import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import scala.Option;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -57,6 +66,13 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
   private static final Logger log = LoggerFactory.getLogger(ContainerProcessManager.class);
 
   /**
+   * Metrics for the {@link ContainerProcessManager}
+   */
+  private final static String METRICS_SOURCE_NAME = "ApplicationMaster";
+  private final static String EXEC_ENV_CONTAINER_ID_SYS_PROPERTY = "CONTAINER_ID";
+
+
+  /**
    * Does this Samza Job need hostAffinity when containers are allocated.
    */
   private final boolean hostAffinityEnabled;
@@ -81,6 +97,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
   // The StandbyContainerManager manages standby-aware allocation and failover of containers
   private final Optional<StandbyContainerManager> standbyContainerManager;
 
+  private final Option<DiagnosticsManager> diagnosticsManager;
+
   /**
    * A standard interface to request resources.
    */
@@ -102,11 +120,9 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
    * value is the {@link ProcessorFailure} object that has a count of failures.
    */
   private final Map<String, ProcessorFailure> processorFailures = new HashMap<>();
-
-  /**
-   * Metrics for the {@link ContainerProcessManager}
-   */
-  private final ContainerProcessManagerMetrics metrics;
+  private ContainerProcessManagerMetrics containerProcessManagerMetrics;
+  private JvmMetrics jvmMetrics;
+  private Map<String, MetricsReporter> metricsReporters;
 
   public ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry) {
     this.state = state;
@@ -117,8 +133,30 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
     ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig);
     this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
-    this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
 
+    // Initialize metrics
+    this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(config, state, registry);
+    this.jvmMetrics = new JvmMetrics(registry);
+    this.metricsReporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), METRICS_SOURCE_NAME);
+
+    // Creating diagnostics manager and reporter, and wiring it respectively
+    String jobName = new JobConfig(config).getName().get();
+    String jobId = new JobConfig(config).getJobId();
+    Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
+    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair =
+        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, METRICS_SOURCE_NAME, execEnvContainerId, config);
+
+    if (diagnosticsManagerReporterPair.isPresent()) {
+      diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
+      metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(), diagnosticsManagerReporterPair.get().getValue());
+    } else {
+      diagnosticsManager = Option.empty();
+    }
+
+    // Wire all metrics to all reporters
+    this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
+
+    // Enable standby container manager if required
     if (jobConfig.getStandbyTasksEnabled()) {
       this.standbyContainerManager = Optional.of(new StandbyContainerManager(state, clusterResourceManager));
     } else {
@@ -135,23 +173,9 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     log.info("Finished container process manager initialization.");
   }
 
-  //for testing
-  ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry,
-      AbstractContainerAllocator allocator, ClusterResourceManager manager) {
-    this.state = state;
-    this.clusterManagerConfig = new ClusterManagerConfig(config);
-    this.jobConfig = new JobConfig(config);
-    this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
-    this.clusterResourceManager = manager;
-    this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
-    this.containerAllocator = allocator;
-    this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
-    this.standbyContainerManager = Optional.empty();
-  }
-
   //package private, used only in tests
   ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry,
-      ClusterResourceManager resourceManager) {
+      ClusterResourceManager resourceManager, Optional<AbstractContainerAllocator> allocator) {
     JobModelManager jobModelManager = state.jobModelManager;
     this.state = state;
     this.clusterManagerConfig = new ClusterManagerConfig(config);
@@ -160,10 +184,12 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
 
     this.clusterResourceManager = resourceManager;
-    this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
     this.standbyContainerManager = Optional.empty();
+    this.diagnosticsManager = Option.empty();
 
-    if (this.hostAffinityEnabled) {
+    if (allocator.isPresent()) {
+      this.containerAllocator = allocator.get();
+    } else if (this.hostAffinityEnabled) {
       this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, this.standbyContainerManager, state);
     } else {
       this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
@@ -186,7 +212,18 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
   public void start() {
     log.info("Starting the container process manager");
-    metrics.start();
+
+    if (jvmMetrics != null) {
+      jvmMetrics.start();
+    }
+
+    if (this.metricsReporters != null) {
+      this.metricsReporters.values().forEach(reporter -> reporter.start());
+    }
+
+    if (this.diagnosticsManager.isDefined()) {
+      this.diagnosticsManager.get().start();
+    }
 
     log.info("Starting the cluster resource manager");
     clusterResourceManager.start();
@@ -217,16 +254,30 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
       Thread.currentThread().interrupt();
     }
 
-    if (metrics != null) {
+    if (this.diagnosticsManager.isDefined()) {
       try {
-        metrics.stop();
-        log.info("Stopped metrics reporters");
-      } catch (Throwable e) {
-        log.error("Exception while stopping metrics", e);
+        this.diagnosticsManager.get().stop();
+      } catch (InterruptedException e) {
+        log.error("InterruptedException while stopping diagnosticsManager", e);
       }
     }
 
     try {
+
+      if (this.metricsReporters != null) {
+        this.metricsReporters.values().forEach(reporter -> reporter.stop());
+      }
+
+      if (this.jvmMetrics != null) {
+        jvmMetrics.stop();
+      }
+
+      log.info("Stopped containerProcessManagerMetrics reporters");
+    } catch (Throwable e) {
+      log.error("Exception while stopping containerProcessManagerMetrics", e);
+    }
+
+    try {
       clusterResourceManager.stop(state.status);
       log.info("Stopped the cluster resource manager");
     } catch (Throwable e) {
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
index fc57846..53526d8 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
@@ -44,10 +44,6 @@ public class MetricGroup {
     return registry.newCounter(groupName, (prefix + name).toLowerCase());
   }
 
-  public <T> ListGauge<T> newListGauge(String name) {
-    return registry.newListGauge(groupName, new ListGauge(name));
-  }
-
   public <T> Gauge<T> newGauge(String name, T value) {
     return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), value));
   }
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 2a8c862..c946bb5 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -31,9 +31,12 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainerListener;
@@ -46,11 +49,14 @@ import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.task.TaskFactory;
+import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.ScalaJavaUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -330,11 +336,23 @@ public class StreamProcessor {
 
   @VisibleForTesting
   SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
+
+    // Creating diagnostics manager and reporter, and wiring it respectively
+    String jobName = new JobConfig(config).getName().get();
+    String jobId = new JobConfig(config).getJobId();
+    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair =
+        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, processorId, Optional.empty(), config);
+    Option<DiagnosticsManager> diagnosticsManager = Option.empty();
+    if (diagnosticsManagerReporterPair.isPresent()) {
+      diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
+      this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(), diagnosticsManagerReporterPair.get().getValue());
+    }
+
     return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
         this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
         Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
-        Option.apply(this.externalContextOptional.orElse(null)), null, null);
+        Option.apply(this.externalContextOptional.orElse(null)), null, null, diagnosticsManager);
   }
 
   private JobCoordinator createJobCoordinator() {
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 564dbf3..978ab24 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -19,11 +19,16 @@
 
 package org.apache.samza.runtime;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
@@ -36,20 +41,20 @@ import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
+import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.ScalaJavaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
 
 public class ContainerLaunchUtil {
   private static final Logger log = LoggerFactory.getLogger(ContainerLaunchUtil.class);
@@ -64,18 +69,21 @@ public class ContainerLaunchUtil {
    */
   public static void run(
       ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
-      String containerId,
+      String jobName, String jobId, String containerId, Optional<String> execEnvContainerId,
       JobModel jobModel) {
 
     Config config = jobModel.getConfig();
-    run(appDesc, containerId, jobModel, config, buildExternalContext(config));
+    run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel, config, buildExternalContext(config));
 
     System.exit(0);
   }
 
   private static void run(
       ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
+      String jobName,
+      String jobId,
       String containerId,
+      Optional<String> execEnvContainerId,
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
@@ -90,15 +98,24 @@ public class ContainerLaunchUtil {
         startpointManager = Optional.of(new StartpointManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, StartpointManager.NAMESPACE)));
       }
 
+      Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
+
+      // Creating diagnostics manager and reporter, and wiring it respectively
+      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, containerId, execEnvContainerId, config);
+      Option<DiagnosticsManager> diagnosticsManager = Option.empty();
+      if (diagnosticsManagerReporterPair.isPresent()) {
+        diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
+        metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(), diagnosticsManagerReporterPair.get().getValue());
+      }
+
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
-          containerId,
-          jobModel,
-          ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
+          containerId, jobModel,
+          ScalaJavaUtil.toScalaMap(metricsReporters),
           taskFactory,
           JobContextImpl.fromConfigWithDefaults(config),
           Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
           Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
-          Option.apply(externalContextOptional.orElse(null)), localityManager, startpointManager.orElse(null));
+          Option.apply(externalContextOptional.orElse(null)), localityManager, startpointManager.orElse(null), diagnosticsManager);
 
       ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
           .createInstance(new ProcessorContext() { }, config);
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index a3e5acf..5197dbc 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.runtime;
 
+import java.util.Optional;
+import java.util.Random;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
@@ -29,11 +31,12 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
-import java.util.Random;
+
 
 /**
  * Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
@@ -56,6 +59,8 @@ public class LocalContainerRunner {
     log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
     System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
+    Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID()));
+
     int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
     JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
     Config config = jobModel.getConfig();
@@ -69,9 +74,12 @@ public class LocalContainerRunner {
     MDC.put("jobName", jobName);
     MDC.put("jobId", jobId);
 
+    DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, execEnvContainerId, config);
+
     ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
         ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
 
-    ContainerLaunchUtil.run(appDesc, containerId, jobModel);
+    ContainerLaunchUtil.run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel);
   }
+
 }
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
new file mode 100644
index 0000000..3c05228
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Optional;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.SystemConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.diagnostics.DiagnosticsManager;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
+import org.apache.samza.runtime.LocalContainerRunner;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+public class DiagnosticsUtil {
+  private static final Logger log = LoggerFactory.getLogger(DiagnosticsUtil.class);
+
+  // Write a file in the samza.log.dir named {exec-env-container-id}.metadata that contains
+  // metadata about the container such as containerId, jobName, jobId, hostname, timestamp, version info, and others.
+  public static void writeMetadataFile(String jobName, String jobId, String containerId,
+      Optional<String> execEnvContainerId, Config config) {
+
+    Option<File> metadataFile = JobConfig.getMetadataFile(Option.apply(execEnvContainerId.orElse(null)));
+
+    if (metadataFile.isDefined()) {
+
+      StringBuilder metadata = new StringBuilder("Version: 1");
+      metadata.append(System.lineSeparator());
+      MetricsHeader metricsHeader =
+          new MetricsHeader(jobName, jobId, "samza-container-" + containerId, execEnvContainerId.orElse(""), LocalContainerRunner.class.getName(),
+              Util.getTaskClassVersion(config), Util.getSamzaVersion(), Util.getLocalHost().getHostName(),
+              System.currentTimeMillis(), System.currentTimeMillis());
+
+      MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics());
+      metadata.append("ContainerMetadata: ");
+      metadata.append(new String(new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+      FileUtil.writeToTextFile(metadataFile.get(), metadata.toString(), false);
+    } else {
+      log.info("Skipping writing metadata file.");
+    }
+  }
+
+
+  /**
+   * Create a pair of DiagnosticsManager and Reporter for the given jobName, jobId, containerId, and execEnvContainerId,
+   * if diagnostics is enabled.
+   * execEnvContainerId is the ID assigned to the container by the cluster manager (e.g., YARN).
+   */
+  public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildDiagnosticsManager(String jobName, String jobId,
+      String containerId, Optional<String> execEnvContainerId, Config config) {
+
+    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = Optional.empty();
+
+    if (new JobConfig(config).getDiagnosticsEnabled()) {
+
+      // Diagnostic stream, producer, and reporter related parameters
+      String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS();
+      Integer publishInterval = new MetricsConfig(config).getMetricsSnapshotReporterInterval(diagnosticsReporterName);
+      String taskClassVersion = Util.getTaskClassVersion(config);
+      String samzaVersion = Util.getSamzaVersion();
+      String hostName = Util.getLocalHost().getHostName();
+      Option<String> blacklist = new MetricsConfig(config).getMetricsSnapshotReporterBlacklist(diagnosticsReporterName);
+      Option<String> diagnosticsReporterStreamName = new MetricsConfig(config).getMetricsSnapshotReporterStream(diagnosticsReporterName);
+
+      if (diagnosticsReporterStreamName.isEmpty()) {
+        throw new ConfigException("Missing required config: " + String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM(), diagnosticsReporterName));
+      }
+
+      SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get());
+
+      Optional<String> diagnosticsSystemFactoryName = new SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem());
+      if (!diagnosticsSystemFactoryName.isPresent()) {
+        throw new SamzaException("Missing factory in config for system " + diagnosticsSystemStream.getSystem());
+      }
+
+      // Create a systemProducer for giving to diagnostic-reporter and diagnosticsManager
+      SystemFactory systemFactory = Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
+      SystemProducer systemProducer = systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap());
+      DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName, jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion,
+          samzaVersion, hostName, diagnosticsSystemStream, systemProducer, Duration.ofMillis(new TaskConfigJava(config).getShutdownMs()));
+
+      MetricsSnapshotReporter diagnosticsReporter =
+          new MetricsSnapshotReporter(systemProducer, diagnosticsSystemStream, publishInterval, jobName, jobId,
+              "samza-container-" + containerId, taskClassVersion, samzaVersion, hostName, new MetricsSnapshotSerdeV2(),
+              blacklist, ScalaJavaUtil.toScalaFunction(() -> System.currentTimeMillis()));
+
+      diagnosticsManagerReporterPair = Optional.of(new ImmutablePair<>(diagnosticsManager, diagnosticsReporter));
+    }
+
+    return diagnosticsManagerReporterPair;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
index e7eeb5e..8acb5e8 100644
--- a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
+++ b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
@@ -19,7 +19,9 @@
 package org.apache.samza.util;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.metrics.MetricsReporter;
@@ -37,11 +39,19 @@ public class MetricsReporterLoader {
   public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig config, String containerName) {
     Map<String, MetricsReporter> metricsReporters = new HashMap<>();
 
-    for (String metricsReporterName : JavaConverters.seqAsJavaListConverter(config.getMetricReporterNames()).asJava()) {
+    String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS();
+
+    // Exclude creation of diagnostics-reporter, because it is created manually in SamzaContainer (to allow sharing of
+    // sysProducer between reporter and diagnosticsManager
+    List<String> metricsReporterNames = JavaConverters.seqAsJavaListConverter(config.getMetricReporterNames()).asJava().
+        stream().filter(reporterName -> !reporterName.equals(diagnosticsReporterName)).collect(Collectors.toList());
+
+    for (String metricsReporterName : metricsReporterNames) {
       String metricsFactoryClassName = config.getMetricsFactoryClass(metricsReporterName).get();
       if (metricsFactoryClassName == null) {
         throw new SamzaException(String.format("Metrics reporter %s missing .class config", metricsReporterName));
       }
+
       MetricsReporterFactory metricsReporterFactory = Util.getObj(metricsFactoryClassName, MetricsReporterFactory.class);
       metricsReporters.put(metricsReporterName,
                            metricsReporterFactory.getMetricsReporter(metricsReporterName,
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index fd97707..24140d9 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -114,6 +114,10 @@ object JobConfig {
   val DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 1
   val SYSTEM_STREAM_PARTITION_MAPPER_FACTORY = "job.system.stream.partition.mapper.factory"
 
+  // Naming format and directory for container.metadata file
+  private val CONTAINER_METADATA_FILENAME_FORMAT = "%s.metadata" // Filename: <containerID>.metadata
+  private val CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY = "samza.log.dir"
+
   implicit def Config2Job(config: Config) = new JobConfig(config)
 
   /**
@@ -132,6 +136,18 @@ object JobConfig {
     }
     fwkPath
   }
+
+  /** The metadata file is written in a <exec-env-container-id>.metadata file in the log-dir of the container.
+   Here the <exec-env-container-id> refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
+   which uniquely identifies a container's lifecycle.*/
+  def getMetadataFile(execEnvContainerId: Option[String]): Option[File] = {
+    val dir = System.getProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY)
+    if (dir == null || execEnvContainerId.isEmpty) {
+      None
+    } else {
+      Option.apply(new File(dir, String.format(JobConfig.CONTAINER_METADATA_FILENAME_FORMAT, execEnvContainerId.get)))
+    }
+  }
 }
 
 class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
@@ -207,8 +223,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
 
-
-
   def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
 
   def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1")
diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
index da29013..d41f5fb 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
@@ -49,7 +49,7 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) {
 
   def getMetricsSnapshotReporterStream(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name)
 
-  def getMetricsSnapshotReporterInterval(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name)
+  def getMetricsSnapshotReporterInterval(name: String): Int = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name).getOrElse("60").toInt
 
   def getMetricsSnapshotReporterBlacklist(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST format name)
 
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index ba5d932..9516327 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -96,11 +96,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     case _ => TaskConfig.DEFAULT_COMMIT_MS
   }
 
-  def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match {
-    case Some(ms) => Some(ms.toLong)
-    case _ => None
-  }
-
   def getTaskClass = getOption(TaskConfig.TASK_CLASS)
 
   def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER)
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 068d494..c38274b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -42,6 +42,7 @@ import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
 import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
 import org.apache.samza.context._
+import org.apache.samza.diagnostics.DiagnosticsManager
 import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode}
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.serializers._
@@ -134,11 +135,14 @@ object SamzaContainer extends Logging {
     applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]],
     externalContextOption: Option[ExternalContext],
     localityManager: LocalityManager = null,
-    startpointManager: StartpointManager = null) = {
+    startpointManager: StartpointManager = null,
+    diagnosticsManager: Option[DiagnosticsManager] = Option.empty) = {
     val config = jobContext.getConfig
     val systemConfig = new SystemConfig(config)
     val containerModel = jobModel.getContainers.get(containerId)
     val containerName = "samza-container-%s" format containerId
+    val jobName = config.getName.get
+    val jobId = config.getJobId
     val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
 
     val containerPID = ManagementFactory.getRuntimeMXBean().getName()
@@ -651,7 +655,8 @@ object SamzaContainer extends Logging {
       containerContext = containerContext,
       applicationContainerContextOption = applicationContainerContextOption,
       externalContextOption = externalContextOption,
-      containerStorageManager = containerStorageManager)
+      containerStorageManager = containerStorageManager,
+      diagnosticsManager = diagnosticsManager)
   }
 
   /**
@@ -689,9 +694,10 @@ class SamzaContainer(
   containerContext: ContainerContext,
   applicationContainerContextOption: Option[ApplicationContainerContext],
   externalContextOption: Option[ExternalContext],
-  containerStorageManager: ContainerStorageManager) extends Runnable with Logging {
+  containerStorageManager: ContainerStorageManager,
+  diagnosticsManager: Option[DiagnosticsManager] = Option.empty) extends Runnable with Logging {
 
-  val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
+  val shutdownMs = new TaskConfigJava(config).getShutdownMs
   var shutdownHookThread: Thread = null
   var jmxServer: JmxServer = null
 
@@ -775,6 +781,7 @@ class SamzaContainer(
       shutdownHostStatisticsMonitor
       shutdownProducers
       shutdownOffsetManager
+      shutdownDiagnostics
       shutdownMetrics
       shutdownSecurityManger
       shutdownAdmins
@@ -867,25 +874,9 @@ class SamzaContainer(
   }
 
   def startDiagnostics {
-    if (config.getDiagnosticsEnabled) {
-      info("Starting diagnostics.")
-
-      try {
-        var diagnosticsAppender = Util.getObj("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender", (classOf[SamzaContainerMetrics], this.metrics))
-        info("Attached log4j diagnostics appender.")
-      }
-      catch {
-        case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
-          try {
-            val diagnosticsAppender = Util.getObj("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender", (classOf[SamzaContainerMetrics], this.metrics))
-            info("Attached log4j2 diagnostics appender.")
-          } catch {
-            case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
-              warn("Failed to instantiate neither diagnostic appender for sending error information to diagnostics stream", e)
-            }
-          }
-        }
-      }
+    if (diagnosticsManager.isDefined) {
+      info("Starting diagnostics manager.")
+      diagnosticsManager.get.start()
     }
   }
 
@@ -1076,6 +1067,13 @@ class SamzaContainer(
     offsetManager.stop
   }
 
+  def shutdownDiagnostics {
+    if (diagnosticsManager.isDefined) {
+      info("Shutting down diagnostics manager.")
+      diagnosticsManager.get.stop()
+    }
+  }
+
   def shutdownMetrics {
     info("Shutting down metrics reporters.")
 
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 326156b..56f2ab6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -49,8 +49,6 @@ class SamzaContainerMetrics(
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()
 
-  val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")
-
   def addStoresRestorationGauge(taskName: TaskName) {
     taskStoreRestorationMetrics.put(taskName, newGauge("%s-restore-time" format(taskName.toString), -1L))
   }
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/BoundedList.java
similarity index 77%
rename from samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
rename to samza-core/src/main/scala/org/apache/samza/diagnostics/BoundedList.java
index d0fb326..ad76c28 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/BoundedList.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.metrics;
+package org.apache.samza.diagnostics;
 
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.stream.Collectors;
@@ -29,17 +30,17 @@ import org.apache.samza.util.TimestampedValue;
 
 
 /**
- * A {@link ListGauge} is a {@link Metric} that buffers multiple instances of a type T in a list.
- * {@link ListGauge}s are useful for maintaining, recording, or collecting values over time.
+ * A {@link BoundedList} buffers multiple instances of a type T in a list.
+ * {@link BoundedList}s are useful for maintaining, recording, or collecting values over time.
  * For example, a set of specific logging-events (e.g., errors).
  *
  * Eviction is controlled by parameters (maxNumberOfItems and maxStaleness), which are set during instantiation.
- * Eviction happens during element addition or during reads of the ListGauge (getValues).
+ * Eviction happens during element addition or during reads of the BoundedList (getValues).
  *
  * All public methods are thread-safe.
  *
  */
-public class ListGauge<T> implements Metric {
+public class BoundedList<T> {
   private final String name;
   private final Queue<TimestampedValue<T>> elements;
 
@@ -49,13 +50,13 @@ public class ListGauge<T> implements Metric {
   private final static Duration DEFAULT_MAX_STALENESS = Duration.ofMinutes(60);
 
   /**
-   * Create a new {@link ListGauge} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters.
+   * Create a new {@link BoundedList} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters.
    *
    * @param name Name to be assigned
-   * @param maxNumberOfItems The max number of items that can remain in the listgauge
-   * @param maxStaleness The max staleness of items permitted in the listgauge
+   * @param maxNumberOfItems The max number of items that can remain in the list
+   * @param maxStaleness The max staleness of items permitted in the list
    */
-  public ListGauge(String name, int maxNumberOfItems, Duration maxStaleness) {
+  public BoundedList(String name, int maxNumberOfItems, Duration maxStaleness) {
     this.name = name;
     this.elements = new ConcurrentLinkedQueue<TimestampedValue<T>>();
     this.maxNumberOfItems = maxNumberOfItems;
@@ -63,15 +64,15 @@ public class ListGauge<T> implements Metric {
   }
 
   /**
-   * Create a new {@link ListGauge} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes.
+   * Create a new {@link BoundedList} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes.
    * @param name Name to be assigned
    */
-  public ListGauge(String name) {
+  public BoundedList(String name) {
     this(name, DEFAULT_MAX_NITEMS, DEFAULT_MAX_STALENESS);
   }
 
   /**
-   * Get the name assigned to this {@link ListGauge}
+   * Get the name assigned to this {@link BoundedList}
    * @return the assigned name
    */
   public String getName() {
@@ -100,11 +101,17 @@ public class ListGauge<T> implements Metric {
   }
 
   /**
-   * {@inheritDoc}
+   * Removes the given elements from the list-gauge.
+   * @param elementsToRemove collection of elements to remove.
    */
-  @Override
-  public void visit(MetricsVisitor visitor) {
-    visitor.listGauge(this);
+  public void remove(Collection<T> elementsToRemove) {
+    Iterator<TimestampedValue<T>> iterator = this.elements.iterator();
+    while (iterator.hasNext()) {
+      TimestampedValue<T> value = iterator.next();
+      if (elementsToRemove.contains(value.getValue())) {
+        iterator.remove();
+      }
+    }
   }
 
   /**
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
index 5a6be1e..cdae352 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
@@ -26,8 +26,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 
 /**
  * This class encapsulates information related to an exception event that is useful for diagnostics.
- * It used to define container, task, and other metrics as
- * {@link org.apache.samza.metrics.ListGauge} of type {@link DiagnosticsExceptionEvent}.
  */
 public class DiagnosticsExceptionEvent {
 
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
new file mode 100644
index 0000000..daaefe4
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.diagnostics;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.lang.reflect.InvocationTargetException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+
+/**
+ * Responsible for publishing data to the diagnostic stream.
+ * Currently emits exception/error events obtained using a customer-appender that attaches to the root-logger.
+ */
+public class DiagnosticsManager {
+  private static final Logger LOG = LoggerFactory.getLogger(DiagnosticsManager.class);
+  private static final Duration DEFAULT_PUBLISH_PERIOD = Duration.ofSeconds(60);
+  // Period size for pushing data to the diagnostic stream
+
+  private static final String PUBLISH_THREAD_NAME = "DiagnosticsManager Thread-%d";
+  private static final String METRICS_GROUP_NAME = "org.apache.samza.container.SamzaContainerMetrics";
+  // Using SamzaContainerMetrics as the group name to maintain compatibility with existing diagnostics
+
+  // Parameters used for populating the MetricHeader when sending diagnostic-stream messages
+  private final String jobName;
+  private final String jobId;
+  private final String containerId;
+  private final String executionEnvContainerId;
+  private final String taskClassVersion;
+  private final String samzaVersion;
+  private final String hostname;
+  private final Instant resetTime;
+
+  private SystemProducer systemProducer; // SystemProducer for writing diagnostics data
+  private BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList for storing DiagnosticExceptionEvent
+  private final ScheduledExecutorService scheduler; // Scheduler for pushing data to the diagnostic stream
+  private final Duration terminationDuration; // duration to wait when terminating the scheduler
+  private final SystemStream diagnosticSystemStream;
+
+  public DiagnosticsManager(String jobName, String jobId, String containerId, String executionEnvContainerId,
+      String taskClassVersion, String samzaVersion, String hostname, SystemStream diagnosticSystemStream,
+      SystemProducer systemProducer, Duration terminationDuration) {
+    this.jobName = jobName;
+    this.jobId = jobId;
+    this.containerId = containerId;
+    this.executionEnvContainerId = executionEnvContainerId;
+    this.taskClassVersion = taskClassVersion;
+    this.samzaVersion = samzaVersion;
+    this.hostname = hostname;
+    resetTime = Instant.now();
+
+    this.systemProducer = systemProducer;
+    this.diagnosticSystemStream = diagnosticSystemStream;
+
+    this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters
+    this.scheduler = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build());
+    this.terminationDuration = terminationDuration;
+
+    try {
+
+      Util.getObj("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender",
+          JavaConverters.collectionAsScalaIterableConverter(
+              Collections.singletonList(new Tuple2<Class<?>, Object>(DiagnosticsManager.class, this)))
+              .asScala()
+              .toSeq());
+
+      LOG.info("Attached log4j diagnostics appender.");
+    } catch (ClassNotFoundException | InstantiationException | InvocationTargetException e) {
+      try {
+        Util.getObj("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender",
+            JavaConverters.collectionAsScalaIterableConverter(
+                Collections.singletonList(new Tuple2<Class<?>, Object>(DiagnosticsManager.class, this)))
+                .asScala()
+                .toSeq());
+        LOG.info("Attached log4j diagnostics appender.");
+      } catch (ClassNotFoundException | InstantiationException | InvocationTargetException ex) {
+
+        LOG.warn(
+            "Failed to instantiate neither diagnostic appender for sending error information to diagnostics stream.",
+            ex);
+      }
+    }
+  }
+
+  public void start() {
+    this.scheduler.scheduleWithFixedDelay(new DiagnosticsStreamPublisher(), 0, DEFAULT_PUBLISH_PERIOD.getSeconds(),
+        TimeUnit.SECONDS);
+  }
+
+  public void stop() throws InterruptedException {
+    scheduler.shutdown();
+
+    // Allow any scheduled publishes to finish, and block for termination
+    scheduler.awaitTermination(terminationDuration.toMillis(), TimeUnit.MILLISECONDS);
+
+    if (!scheduler.isTerminated()) {
+      LOG.warn("Unable to terminate scheduler");
+      scheduler.shutdownNow();
+    }
+  }
+
+  public void addExceptionEvent(DiagnosticsExceptionEvent diagnosticsExceptionEvent) {
+    this.exceptions.add(diagnosticsExceptionEvent);
+  }
+
+  private class DiagnosticsStreamPublisher implements Runnable {
+
+    @Override
+    public void run() {
+      // Publish exception events if there are any
+      Collection<DiagnosticsExceptionEvent> exceptionList = exceptions.getValues();
+
+      if (!exceptionList.isEmpty()) {
+
+        // Create the metricHeader
+        MetricsHeader metricsHeader = new MetricsHeader(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId,
+            DiagnosticsUtil.class.getName(), taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(),
+            resetTime.toEpochMilli());
+
+        Map<String, Map<String, Object>> metricsMessage = new HashMap<>();
+        metricsMessage.putIfAbsent(METRICS_GROUP_NAME, new HashMap<>());
+        metricsMessage.get(METRICS_GROUP_NAME).put(exceptions.getName(), exceptionList);
+        MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricsMessage));
+
+        try {
+          systemProducer.send(DiagnosticsManager.class.getName(),
+              new OutgoingMessageEnvelope(diagnosticSystemStream, metricsHeader.getHost(), null,
+                  new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+
+          // Remove exceptions from list after successful publish to diagnostics stream
+          exceptions.remove(exceptionList);
+        } catch (Exception e) {
+          LOG.error("Exception when flushing exceptions", e);
+        }
+      }
+    }
+  }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 30c8d1d..6d6b1b7 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -21,68 +21,45 @@ package org.apache.samza.metrics
 
 import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.{ClusterManagerConfig, Config}
-import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.util.Logging
-import org.apache.samza.util.MetricsReporterLoader
-
-import scala.collection.JavaConverters._
-
-object ContainerProcessManagerMetrics {
-  val sourceName = "ApplicationMaster"
-}
 
 /**
- * Responsible for wiring up Samza's metrics. Given that Samza has a metric
- * registry, we might as well use it. This class takes Samza's application
- * master state, and converts it to metrics.
- */
-class ContainerProcessManagerMetrics(
-                                      val config: Config,
-                                      val state: SamzaApplicationState,
-                                      val registry: ReadableMetricsRegistry) extends MetricsHelper  with Logging {
-
-  val jvm = new JvmMetrics(registry)
-  val reporters = MetricsReporterLoader.getMetricsReporters(config, ContainerProcessManagerMetrics.sourceName).asScala
+  * Responsible for wiring up Samza's metrics. Given that Samza has a metric
+  * registry, we might as well use it. This class takes Samza's application
+  * master state, and converts it to metrics.
+  */
+class ContainerProcessManagerMetrics(val config: Config,
+  val state: SamzaApplicationState,
+  val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging {
   val clusterManagerConfig = new ClusterManagerConfig(config)
 
-  reporters.values.foreach(_.register(ContainerProcessManagerMetrics.sourceName, registry))
-
-   def start() {
-    val mRunningContainers = newGauge("running-containers", () => state.runningProcessors.size)
-    val mNeededContainers = newGauge("needed-containers", () => state.neededProcessors.get())
-    val mCompletedContainers = newGauge("completed-containers", () => state.completedProcessors.get())
-    val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get())
-    val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())
-    val mContainers = newGauge("container-count", () => state.processorCount)
-    val mRedundantNotifications = newGauge("redundant-notifications", () => state.redundantNotifications.get())
-    val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0)
-    val mPreferredHostRequests = newGauge("preferred-host-requests", () => state.preferredHostRequests.get())
-    val mAnyHostRequests = newGauge("any-host-requests", () => state.anyHostRequests.get())
-    val mExpiredPreferredHostRequests = newGauge("expired-preferred-host-requests", () => state.expiredPreferredHostRequests.get())
-    val mExpiredAnyHostRequests = newGauge("expired-any-host-requests", () => state.expiredAnyHostRequests.get())
-
-    val mHostAffinityMatchPct = newGauge("host-affinity-match-pct", () => {
-      val numPreferredHostRequests = state.preferredHostRequests.get()
-      val numExpiredPreferredHostRequests = state.expiredPreferredHostRequests.get()
-      if (numPreferredHostRequests != 0) {
-            100.00 * (numPreferredHostRequests - numExpiredPreferredHostRequests) / numPreferredHostRequests
-          } else {
-            0L
-          }
-      })
-
-     val mFailedStandbyAllocations = newGauge("failed-standby-allocations", () => state.failedStandbyAllocations.get())
-     val mFailoversToAnyHost = newGauge("failovers-to-any-host", () => state.failoversToAnyHost.get())
-     val mFailoversToStandby = newGauge("failovers-to-standby", () => state.failoversToStandby.get())
+  val mRunningContainers = newGauge("running-containers", () => state.runningProcessors.size)
+  val mNeededContainers = newGauge("needed-containers", () => state.neededProcessors.get())
+  val mCompletedContainers = newGauge("completed-containers", () => state.completedProcessors.get())
+  val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get())
+  val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())
+  val mContainers = newGauge("container-count", () => state.processorCount)
+  val mRedundantNotifications = newGauge("redundant-notifications", () => state.redundantNotifications.get())
+  val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0)
+  val mPreferredHostRequests = newGauge("preferred-host-requests", () => state.preferredHostRequests.get())
+  val mAnyHostRequests = newGauge("any-host-requests", () => state.anyHostRequests.get())
+  val mExpiredPreferredHostRequests = newGauge("expired-preferred-host-requests", () => state.expiredPreferredHostRequests.get())
+  val mExpiredAnyHostRequests = newGauge("expired-any-host-requests", () => state.expiredAnyHostRequests.get())
 
-     val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb)
-     val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores)
+  val mHostAffinityMatchPct = newGauge("host-affinity-match-pct", () => {
+    val numPreferredHostRequests = state.preferredHostRequests.get()
+    val numExpiredPreferredHostRequests = state.expiredPreferredHostRequests.get()
+    if (numPreferredHostRequests != 0) {
+      100.00 * (numPreferredHostRequests - numExpiredPreferredHostRequests) / numPreferredHostRequests
+    } else {
+      0L
+    }
+  })
 
-    jvm.start
-    reporters.values.foreach(_.start)
-  }
+  val mFailedStandbyAllocations = newGauge("failed-standby-allocations", () => state.failedStandbyAllocations.get())
+  val mFailoversToAnyHost = newGauge("failovers-to-any-host", () => state.failoversToAnyHost.get())
+  val mFailoversToStandby = newGauge("failovers-to-standby", () => state.failoversToStandby.get())
 
-   def stop() {
-    reporters.values.foreach(_.stop)
-  }
+  val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb)
+  val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores)
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index 21ec763..6e1fb8c 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -40,8 +40,6 @@ trait MetricsHelper {
 
   def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value)
 
-  def newListGauge[T](name: String) = metricGroup.newListGauge[T](name)
-
   /**
    * Specify a dynamic gauge that always returns the latest value when polled.
    * The value closure must be thread safe, since metrics reporters may access
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index 75ed6aa..40ffee2 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -75,21 +75,6 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with
     newTimer(group, new Timer(name))
   }
 
-  /**
-    * Register a {@link org.apache.samza.metrics.ListGauge}
-    *
-    * @param group     Group for this ListGauge
-    * @param listGauge the ListGauge to register
-    * @tparam T the type of the list gauge
-    */
-  def newListGauge[T](group: String, listGauge: ListGauge[T]) = {
-    debug("Adding new listgauge %s %s %s." format(group, listGauge.getName, listGauge))
-    putAndGetGroup(group).putIfAbsent(listGauge.getName, listGauge)
-    val realListGauge = metrics.get(group).get(listGauge.getName).asInstanceOf[ListGauge[T]]
-    listeners.foreach(_.onListGauge(group, realListGauge))
-    realListGauge
-  }
-
   private def putAndGetGroup(group: String) = {
     metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric])
     metrics.get(group)
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
index b48aaf7..84a487a 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -25,7 +25,6 @@ import java.util
 import org.apache.samza.util.Logging
 import javax.management.MBeanServer
 import javax.management.ObjectName
-
 import org.apache.samza.config.Config
 import org.apache.samza.metrics._
 
@@ -49,8 +48,6 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
               def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))))
               def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry))))
               def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry))))
-              def listGauge[T](listGauge: ListGauge[T]) = registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, name, sources(registry))))
-
             })
         }
       })
@@ -70,9 +67,6 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
         def onTimer(group: String, timer: Timer) {
           registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source)))
         }
-        def onListGauge(group: String, listGauge: ListGauge[_]) {
-          registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, listGauge.getName, source)))
-        }
       }
     } else {
       warn("Trying to re-register a registry for source %s. Ignoring." format source)
@@ -105,20 +99,11 @@ trait JmxGaugeMBean extends MetricMBean {
   def getValue(): Object
 }
 
-trait JmxListGaugeMBean extends MetricMBean {
-  def getValue(): util.Collection[Object]
-}
-
 class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], on: ObjectName) extends JmxGaugeMBean {
   def getValue = g.getValue
   def objectName = on
 }
 
-class JmxListGauge(g: org.apache.samza.metrics.ListGauge[Object], on: ObjectName) extends JmxListGaugeMBean {
-  def getValue = g.getValues
-  def objectName = on
-}
-
 trait JmxCounterMBean extends MetricMBean {
   def getCount(): Long
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index 8c13840..f9cf819 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -122,8 +122,6 @@ class MetricsSnapshotReporter(
           case (name, metric) =>
             if (!shouldIgnore(group, name)) {
               metric.visit(new MetricsVisitor {
-                // for listGauge the value is returned as a list, which gets serialized
-                def listGauge[T](listGauge: ListGauge[T]) = { groupMsg.put(name, listGauge.getValues) }
                 def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
                 def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
                 def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double)
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 2191572..957cd6f 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -45,24 +45,6 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
     val jobId = config
       .getJobId
 
-    val version =
-      try {
-        val taskClass = Option(new ApplicationConfig(config).getAppClass())
-          .orElse(config.getTaskClass).get
-        Option(Class.forName(taskClass).getPackage.getImplementationVersion).get
-      } catch {
-        case e: Exception => {
-          warn("Unable to find implementation version in jar's meta info. Defaulting to 0.0.1.")
-          "0.0.1"
-        }
-      }
-
-    val samzaVersion = Option(classOf[MetricsSnapshotReporterFactory].getPackage.getImplementationVersion)
-      .getOrElse({
-        warn("Unable to find implementation samza version in jar's meta info. Defaulting to 0.0.1.")
-        "0.0.1"
-      })
-
     val metricsSystemStreamName = config
       .getMetricsSnapshotReporterStream(name)
       .getOrElse(throw new SamzaException("No metrics stream defined in config."))
@@ -104,7 +86,6 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
 
     val pollingInterval: Int = config
       .getMetricsSnapshotReporterInterval(name)
-      .getOrElse("60").toInt
 
     info("Setting polling interval to %d" format pollingInterval)
 
@@ -118,8 +99,8 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
       jobName,
       jobId,
       containerName,
-      version,
-      samzaVersion,
+      Util.getTaskClassVersion(config),
+      Util.getSamzaVersion(),
       Util.getLocalHost.getHostName,
       serde, blacklist)
 
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
index 0845b5c..98b1447 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -21,13 +21,13 @@
 
 package org.apache.samza.util
 
-import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.io._
 import java.nio.file._
 import java.util.zip.CRC32
 
 import org.apache.samza.util.Util.info
 
-object FileUtil {
+object FileUtil extends Logging {
   /**
     * Writes checksum & data to a file
     * Checksum is pre-fixed to the data and is a 32-bit long type data.
@@ -51,11 +51,50 @@ object FileUtil {
     }
 
     //atomic swap of tmp and real offset file
+    swapFiles(tmpFile, file)
+  }
+
+  /**
+    * Writes the data to a text file
+    * @param file The file handle to write to
+    * @param data The data to be written to the file
+    * @param append true for appending data to file, false otherwise
+    * */
+  def writeToTextFile(file: File, data: String, append: Boolean): Unit = {
+
+    val tmpFilePath = file.getAbsolutePath + ".tmp"
+    var fileWriter: FileWriter = null
+    val tmpFile = new File(tmpFilePath)
+
+    //atomic swap of tmp and real file if we need to append
+    if (append) {
+      swapFiles(file, tmpFile)
+    }
+
+    try {
+      fileWriter = new FileWriter(tmpFile, append)
+      fileWriter.write(data)
+    } catch {
+      case e: Exception =>
+        error("Error in writing to file %s isAppend %s" format (file, append))
+        System.out.println(e)
+    } finally {
+      fileWriter.close()
+    }
+
+    //atomic swap of tmp and real file
+    swapFiles(tmpFile, file)
+  }
+
+  private def swapFiles(source: File, destination: File) : Unit = {
+    //atomic swap of source and destination file
     try {
-      Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING)
+      if (source.exists()) {
+        Files.move(source.toPath, destination.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING)
+      }
     } catch {
       case e: AtomicMoveNotSupportedException =>
-        Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.REPLACE_EXISTING)
+        Files.move(source.toPath, destination.toPath, StandardCopyOption.REPLACE_EXISTING)
     }
   }
 
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 1bb6648..5f5498d 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -30,6 +30,7 @@ import java.net.InetAddress
 import java.net.NetworkInterface
 import java.util.Random
 
+
 import scala.collection.JavaConverters._
 
 
@@ -64,13 +65,37 @@ object Util extends Logging {
     }
   }
 
+  def getSamzaVersion(): String = {
+    Option(this.getClass.getPackage.getImplementationVersion)
+      .getOrElse({
+        warn("Unable to find implementation samza version in jar's meta info. Defaulting to 0.0.1.")
+        "0.0.1"
+      })
+  }
+
+  def getTaskClassVersion(config: Config): String = {
+    try {
+      val taskClass = Option(new ApplicationConfig(config).getAppClass())
+        .orElse(new TaskConfig(config).getTaskClass).get
+      Class.forName(taskClass).getPackage.getImplementationVersion
+    } catch {
+      case e: Exception => {
+        warn("Unable to find implementation version in jar's meta info. Defaulting to 0.0.1.")
+        "0.0.1"
+      }
+    }
+  }
+
   /**
     * Instantiate an object from given className, and given constructor parameters.
     */
-  def getObj[T](className: String, constructorParams: (Class[_], Object)*) : T = {
+  @throws[ClassNotFoundException]
+  @throws[InstantiationException]
+  @throws[InvocationTargetException]
+  def getObj(className: String, constructorParams: (Class[_], Object)*) = {
     try {
       Class.forName(className).getDeclaredConstructor(constructorParams.map(x => x._1): _*)
-        .newInstance(constructorParams.map(x => x._2): _*).asInstanceOf[T]
+        .newInstance(constructorParams.map(x => x._2): _*)
     } catch {
       case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
         warn("Could not instantiate an instance for class %s." format className, e)
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 74ea12a..0400b2e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -20,6 +20,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.Optional;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -127,8 +128,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
 
     AbstractContainerAllocator allocator =
         (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
@@ -149,8 +149,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
 
     allocator =
         (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
@@ -170,8 +169,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
@@ -214,8 +212,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
     taskManager.start();
 
     Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
@@ -240,8 +237,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
@@ -290,8 +286,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
@@ -369,8 +364,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
@@ -415,8 +409,7 @@ public class TestContainerProcessManager {
         new MapConfig(config),
         state);
 
-    ContainerProcessManager manager = new ContainerProcessManager(config, state, new MetricsRegistryMap(), allocator,
-        clusterResourceManager);
+    ContainerProcessManager manager = new ContainerProcessManager(config, state, new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocator));
 
     manager.start();
     SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -443,8 +436,7 @@ public class TestContainerProcessManager {
         cfg,
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
 
     MockHostAwareContainerAllocator allocator = new MockHostAwareContainerAllocator(
         clusterResourceManager,
@@ -510,8 +502,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
@@ -586,8 +577,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
@@ -675,8 +665,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
     taskManager.start();
     SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
     assertFalse(taskManager.shouldShutdown());
@@ -689,8 +678,7 @@ public class TestContainerProcessManager {
         new MapConfig(config),
         state,
         new MetricsRegistryMap(),
-        clusterResourceManager
-    );
+        clusterResourceManager, Optional.empty());
     taskManager1.start();
     taskManager1.onResourceAllocated(container2);
   }
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
index 9f84ce6..b0b65e0 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
-import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.diagnostics.BoundedList;
 import org.apache.samza.metrics.reporter.Metrics;
 import org.apache.samza.metrics.reporter.MetricsHeader;
 import org.apache.samza.metrics.reporter.MetricsSnapshot;
@@ -40,17 +40,17 @@ public class TestMetricsSnapshotSerdeV2 {
         new MetricsHeader("jobName", "i001", "container 0", "test container ID", "source", "300.14.25.1", "1", "1", 1,
             1);
 
-    ListGauge listGauge = new ListGauge<DiagnosticsExceptionEvent>("exceptions");
+    BoundedList boundedList = new BoundedList<DiagnosticsExceptionEvent>("exceptions");
     DiagnosticsExceptionEvent diagnosticsExceptionEvent1 =
         new DiagnosticsExceptionEvent(1, new SamzaException("this is a samza exception", new RuntimeException("cause")),
             new HashMap());
 
-    listGauge.add(diagnosticsExceptionEvent1);
+    boundedList.add(diagnosticsExceptionEvent1);
 
     String samzaContainerMetricsGroupName = "org.apache.samza.container.SamzaContainerMetrics";
     Map<String, Map<String, Object>> metricMessage = new HashMap<>();
     metricMessage.put(samzaContainerMetricsGroupName, new HashMap<>());
-    metricMessage.get(samzaContainerMetricsGroupName).put("exceptions", listGauge.getValues());
+    metricMessage.get(samzaContainerMetricsGroupName).put("exceptions", boundedList.getValues());
     metricMessage.get(samzaContainerMetricsGroupName).put("commit-calls", 0);
 
     MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricMessage));
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java b/samza-core/src/test/scala/org/apache/samza/metrics/TestBoundedList.java
similarity index 65%
rename from samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java
rename to samza-core/src/test/scala/org/apache/samza/metrics/TestBoundedList.java
index eb91012..698877e 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestBoundedList.java
@@ -21,40 +21,41 @@ package org.apache.samza.metrics;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Iterator;
+import org.apache.samza.diagnostics.BoundedList;
 import org.junit.Assert;
 import org.junit.Test;
 
 
 /**
- * Class to encapsulate test-cases for {@link org.apache.samza.metrics.ListGauge}
+ * Class to encapsulate test-cases for {@link BoundedList}
  */
-public class TestListGauge {
+public class TestBoundedList {
 
   private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10);
 
-  private <T> ListGauge<T> getListGaugeForTest() {
-    return new ListGauge<T>("sampleListGauge", 10, Duration.ofSeconds(60));
+  private <T> BoundedList<T> getBoundedListForTest() {
+    return new BoundedList<T>("sampleListGauge", 10, Duration.ofSeconds(60));
   }
 
   @Test
   public void basicTest() {
-    ListGauge<String> listGauge = getListGaugeForTest();
-    listGauge.add("sampleValue");
-    Assert.assertEquals("Names should be the same", listGauge.getName(), "sampleListGauge");
-    Assert.assertEquals("List sizes should match", listGauge.getValues().size(), 1);
-    Assert.assertEquals("ListGauge should contain sampleGauge", listGauge.getValues().contains("sampleValue"), true);
+    BoundedList<String> boundedList = getBoundedListForTest();
+    boundedList.add("sampleValue");
+    Assert.assertEquals("Names should be the same", boundedList.getName(), "sampleListGauge");
+    Assert.assertEquals("List sizes should match", boundedList.getValues().size(), 1);
+    Assert.assertEquals("BoundedList should contain sampleGauge", boundedList.getValues().contains("sampleValue"), true);
   }
 
   @Test
   public void testSizeEnforcement() {
-    ListGauge listGauge = getListGaugeForTest();
+    BoundedList boundedList = getBoundedListForTest();
     for (int i = 15; i > 0; i--) {
-      listGauge.add("v" + i);
+      boundedList.add("v" + i);
     }
-    Assert.assertEquals("List sizes should be as configured at creation time", listGauge.getValues().size(), 10);
+    Assert.assertEquals("List sizes should be as configured at creation time", boundedList.getValues().size(), 10);
 
     int valueIndex = 10;
-    Collection<String> currentList = listGauge.getValues();
+    Collection<String> currentList = boundedList.getValues();
     Iterator iterator = currentList.iterator();
     while (iterator.hasNext()) {
       String gaugeValue = (String) iterator.next();
@@ -65,13 +66,13 @@ public class TestListGauge {
 
   @Test
   public void testThreadSafety() throws InterruptedException {
-    ListGauge<Integer> listGauge = getListGaugeForTest();
+    BoundedList<Integer> boundedList = getBoundedListForTest();
 
     Thread thread1 = new Thread(new Runnable() {
       @Override
       public void run() {
         for (int i = 1; i <= 100; i++) {
-          listGauge.add(i);
+          boundedList.add(i);
         }
       }
     });
@@ -80,7 +81,7 @@ public class TestListGauge {
       @Override
       public void run() {
         for (int i = 1; i <= 100; i++) {
-          listGauge.add(i);
+          boundedList.add(i);
         }
       }
     });
@@ -91,8 +92,8 @@ public class TestListGauge {
     thread1.join(THREAD_TEST_TIMEOUT.toMillis());
     thread2.join(THREAD_TEST_TIMEOUT.toMillis());
 
-    Assert.assertTrue("ListGauge should have the last 10 values", listGauge.getValues().size() == 10);
-    for (Integer gaugeValue : listGauge.getValues()) {
+    Assert.assertTrue("BoundedList should have the last 10 values", boundedList.getValues().size() == 10);
+    for (Integer gaugeValue : boundedList.getValues()) {
       Assert.assertTrue("Values should have the last 10 range", gaugeValue <= 100 && gaugeValue > 90);
     }
   }
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
index 31f0d47..95135b2 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
@@ -21,17 +21,15 @@ package org.apache.samza.logging.log4j;
 
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.spi.LoggingEvent;
-import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
-import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
  * Provides an in-memory appender that parses LoggingEvents to filter events relevant to diagnostics.
- * Currently, filters exception related events and update an exception metric ({@link ListGauge}) in
- * {@link SamzaContainerMetrics}.
+ * Currently, filters exception related events and updates the {@link DiagnosticsManager}.
  *
  * When used inconjunction with {@link org.apache.samza.metrics.reporter.MetricsSnapshotReporter} provides a
  * stream of diagnostics-related events.
@@ -41,14 +39,14 @@ public class SimpleDiagnosticsAppender extends AppenderSkeleton {
 
   // simple object to synchronize root logger attachment
   private static final Object SYNCHRONIZATION_OBJECT = new Object();
-  protected final ListGauge<DiagnosticsExceptionEvent> samzaContainerExceptionMetric;
+  protected final DiagnosticsManager diagnosticsManager;
 
   /**
    * A simple log4j1.2.* appender, which attaches itself to the root logger.
    * Attachment to the root logger is thread safe.
    */
-  public SimpleDiagnosticsAppender(SamzaContainerMetrics samzaContainerMetrics) {
-    this.samzaContainerExceptionMetric = samzaContainerMetrics.exceptions();
+  public SimpleDiagnosticsAppender(DiagnosticsManager diagnosticsManager) {
+    this.diagnosticsManager = diagnosticsManager;
     this.setName(SimpleDiagnosticsAppender.class.getName());
 
     synchronized (SYNCHRONIZATION_OBJECT) {
@@ -74,7 +72,7 @@ public class SimpleDiagnosticsAppender extends AppenderSkeleton {
             new DiagnosticsExceptionEvent(loggingEvent.timeStamp, loggingEvent.getThrowableInformation().getThrowable(),
                 loggingEvent.getProperties());
 
-        samzaContainerExceptionMetric.add(diagnosticsExceptionEvent);
+        diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent);
         LOG.debug("Received DiagnosticsExceptionEvent " + diagnosticsExceptionEvent);
       } else {
         LOG.debug("Received non-exception event with message " + loggingEvent.getMessage());
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/SimpleDiagnosticsAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/SimpleDiagnosticsAppender.java
index 37133db..932b3f7 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/SimpleDiagnosticsAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/SimpleDiagnosticsAppender.java
@@ -26,16 +26,14 @@ import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.appender.AbstractAppender;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
-import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Provides an in-memory appender that parses LogEvents to filter events relevant to diagnostics.
- * Currently, filters exception related events and update an exception metric ({@link ListGauge}) in
- * {@link SamzaContainerMetrics}.
+ * Currently, filters exception related events and updates the {@link DiagnosticsManager}.
  *
  * When used inconjunction with {@link org.apache.samza.metrics.reporter.MetricsSnapshotReporter} provides a
  * stream of diagnostics-related events.
@@ -45,11 +43,11 @@ public class SimpleDiagnosticsAppender extends AbstractAppender {
 
   // simple object to synchronize root logger attachment
   private static final Object SYNCHRONIZATION_OBJECT = new Object();
-  protected final ListGauge<DiagnosticsExceptionEvent> samzaContainerExceptionMetric;
+  protected final DiagnosticsManager diagnosticsManager;
 
-  public SimpleDiagnosticsAppender(SamzaContainerMetrics samzaContainerMetrics) {
+  public SimpleDiagnosticsAppender(DiagnosticsManager diagnosticsManager) {
     super(SimpleDiagnosticsAppender.class.getName(), null, null);
-    this.samzaContainerExceptionMetric = samzaContainerMetrics.exceptions();
+    this.diagnosticsManager = diagnosticsManager;
 
     synchronized (SYNCHRONIZATION_OBJECT) {
       attachAppenderToLoggers(this);
@@ -82,7 +80,7 @@ public class SimpleDiagnosticsAppender extends AbstractAppender {
             new DiagnosticsExceptionEvent(logEvent.getTimeMillis(), logEvent.getThrown(),
                 logEvent.getContextData().toMap());
 
-        samzaContainerExceptionMetric.add(diagnosticsExceptionEvent);
+        diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent);
         LOG.debug("Received DiagnosticsExceptionEvent " + diagnosticsExceptionEvent);
       } else {
         LOG.debug("Received non-exception event with message " + logEvent.getMessage());
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java b/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
index 4dd4dd8..9a4b8c9 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.ListGauge;
 import org.apache.samza.metrics.Timer;
 
 
@@ -36,7 +35,6 @@ public class TestMetricsRegistryImpl implements org.apache.samza.metrics.Metrics
   private Map<String, List<Counter>> counters = new HashMap<>();
   private Map<String, List<Timer>> timers = new HashMap<>();
   private Map<String, List<Gauge<?>>> gauges = new HashMap<>();
-  private Map<String, List<ListGauge>> listGauges = new HashMap<>();
 
   @Override
   public Counter newCounter(String group, String name) {
@@ -107,11 +105,5 @@ public class TestMetricsRegistryImpl implements org.apache.samza.metrics.Metrics
     return gauges;
   }
 
-  @Override
-  public ListGauge newListGauge(String group, ListGauge listGauge) {
-    listGauges.putIfAbsent(group, new ArrayList());
-    listGauges.get(group).add(listGauge);
-    return listGauge;
-  }
 
 }
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index 58adf26..9524145 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -45,7 +45,7 @@ public class YarnConfig extends MapConfig {
   /**
    * (Optional) JVM options to include in the command line when executing the AM
    */
-  public static final String AM_JVM_OPTIONS = "yarn.am.opts";
+  public static final String AM_JVM_OPTIONS = "yClusterBasedJobCoordinator.javaarn.am.opts";
 
   /**
    * Determines whether a JMX server should be started on the AM
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index 25b6855..56017f1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -46,9 +46,6 @@ object ApplicationMasterRestServlet {
       metricsRegistry.getGroup(group).asScala.foreach {
         case (name, metric) =>
           metric.visit(new MetricsVisitor() {
-            def listGauge[T](listGauge: ListGauge[T]) =
-              groupMap.put(name, listGauge.getValues)
-
             def counter(counter: Counter) =
               groupMap.put(counter.getName, counter.getCount: lang.Long)