You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2021/01/07 20:29:24 UTC

[samza] branch master updated: SAMZA-2615: Metrics for Application Master High Availability (#1455)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d003651  SAMZA-2615: Metrics for Application Master High Availability (#1455)
d003651 is described below

commit d0036516509674252edeeb181d994824f0bbec9a
Author: mynameborat <bh...@gmail.com>
AuthorDate: Thu Jan 7 12:29:17 2021 -0800

    SAMZA-2615: Metrics for Application Master High Availability (#1455)
    
    Description:
    Application Master High Availability feature was introduced as part of PRs: #1442, #1448, #1449 and #1450. We add the relevant metrics for the components introduced as part of AM HA.
    
    Changes:
    Metrics for Dynamic Container Heartbeat mechanism
    Refactored ContainerHeartbeatMonitor to take config object
    Metrics clean up for metadata management
    Metrics documentation update
    Additional metrics for Application Master to track the number of containers carried from previous attempt
---
 .../versioned/container/metrics-table.html         |  57 +++++++
 .../samza/container/ContainerHeartbeatMonitor.java | 112 +++++++++++--
 .../coordinator/JobCoordinatorMetadataManager.java | 180 +++++++++++++--------
 .../apache/samza/runtime/ContainerLaunchUtil.java  |  18 +--
 .../container/TestContainerHeartbeatMonitor.java   | 104 ++++++++----
 .../TestJobCoordinatorMetadataManager.java         |  19 ++-
 .../samza/job/yarn/YarnClusterResourceManager.java |   1 +
 .../samza/job/yarn/SamzaAppMasterMetrics.scala     |   8 +-
 .../job/yarn/TestYarnClusterResourceManager.java   |   3 +
 .../webapp/TestApplicationMasterRestClient.java    |   3 +-
 10 files changed, 373 insertions(+), 132 deletions(-)

diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index acfbbec..3bab5be 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -144,6 +144,8 @@
     <li><a href="#elasticsearch-system-producer-metrics">ElasticsearchSystemProducerMetrics</a></li>
     <li><a href="#zookeeper-client-metrics">ZookeeperClientMetrics</a></li>
     <li><a href="#zookeeper-job-coordinator-metrics">ZkJobCoordinatorMetrics</a></li>
+    <li><a href="#job-coordinator-metadata-manager-metrics">JobCoordinatorMetadataManagerMetrics</a></li>
+    <li><a href="#container-heartbeat-monitor-metrics">ContainerHeartbeatMonitorMetrics</a></li>
 </ul>
 <p>Words highlighted like <span class="system">this</span> are placeholders for your own variable names defined in configuration file or system variables defined while starting the job.</p>
 <p id="average-time" style="color: #00a">Note: Average time is calculated for the current time window (set to 300 seconds)</p>
@@ -509,6 +511,10 @@
         <td>heartbeats-expired</td>
         <td>Number of heartbeat requests from containers that are invalid</td>
     </tr>
+    <tr>
+        <td>container-from-previous-attempt</td>
+        <td>Number of containers carried from previous attempt in YARN. The metrics is applicable only when Application Master High Availability is enabled</td>
+    </tr>
 
     <tr>
         <th colspan="2" class="section" id="kafka-system-consumer-metrics">org.apache.samza.system.kafka.KafkaSystemConsumerMetrics</th>
@@ -977,6 +983,57 @@
         <td>single-barrier-rebalancing-time</td>
         <td><a href="#average-time">Average time</a> taken for all the processors to get the latest version of the job model after single processor change (without the occurence of a barrier timeout)</td>
     </tr>
+    <tr>
+        <th colspan="2" class="section" id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following metrics are applicable when Application Master High Availability is enabled</span></th>
+    </tr>
+    <tr>
+        <td>application-attempt-count</td>
+        <td>Denotes the application attempt count within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>config-changed</td>
+        <td>Denotes the number of times configuration changed across attempts within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>job-model-changed</td>
+        <td>Denotes the number of times job model changed across attempts within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>metadata-read-failed-count</td>
+        <td>Read failure count from the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-write-failed-count</td>
+        <td>Write failure count to the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-generation-failed-count</td>
+        <td>Number of times the metadata generation failed</td>
+    </tr>
+    <tr>
+        <td>new-deployment</td>
+        <td>Denotes a new deployment due to changes in metadata either attributed to config or job model</td>
+    </tr>
+    </tbody>
+    <tr>
+        <th colspan="2" class="section" id="container-heartbeat-monitor-metrics">org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics<br><span style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following metrics are applicable when Application Master High Availability is enabled</span></th>
+    </tr>
+    <tr>
+        <td>heartbeat-discovery-time-ms</td>
+        <td>Time taken in millis for the container to discover new AM in the event AM changes for heartbeat establishment</td>
+    </tr>
+    <tr>
+        <td>heartbeat-established-failure-count</td>
+        <td>Number of failed attempts to establish heartbeat in the event of AM changes</td>
+    </tr>
+    <tr>
+        <td>heartbeat-established-with-new-am-count</td>
+        <td>Number of times heartbeat is established with the new AM in the event of AM changes</td>
+    </tr>
+    <tr>
+        <td>heartbeat-expired-count</td>
+        <td>Number of times heartbeat expired with the active AM</td>
+    </tr>
     </tbody>
 </table>
 </body>
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 0eba766..c69e6f6 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -20,6 +20,7 @@
 package org.apache.samza.container;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -27,10 +28,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ThreadUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +50,7 @@ public class ContainerHeartbeatMonitor {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
   private static final CoordinatorStreamValueSerde SERDE = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+  private static final String SOURCE_NAME = "SamzaContainer";
 
   @VisibleForTesting
   static final int SCHEDULE_MS = 60000;
@@ -55,31 +66,35 @@ public class ContainerHeartbeatMonitor {
   private final long retryCount;
 
   private ContainerHeartbeatClient containerHeartbeatClient;
+  private ContainerHeartbeatMetrics metrics;
+  private Map<String, MetricsReporter> reporters;
   private String coordinatorUrl;
   private boolean started = false;
 
   public ContainerHeartbeatMonitor(Runnable onContainerExpired, String coordinatorUrl, String containerExecutionId,
-      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled, long retryCount,
-      long sleepDurationForReconnectWithAM) {
+      MetadataStore coordinatorStreamStore, Config config) {
     this(onContainerExpired, new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId),
         Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), coordinatorUrl, containerExecutionId,
-        coordinatorStreamStore, isApplicationMasterHighAvailabilityEnabled, retryCount, sleepDurationForReconnectWithAM);
+        coordinatorStreamStore, config);
   }
 
   @VisibleForTesting
   ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient,
       ScheduledExecutorService scheduler, String coordinatorUrl, String containerExecutionId,
-      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled,
-      long retryCount, long sleepDurationForReconnectWithAM) {
+      MetadataStore coordinatorStreamStore, Config config) {
     this.onContainerExpired = onContainerExpired;
     this.containerHeartbeatClient = containerHeartbeatClient;
     this.scheduler = scheduler;
     this.coordinatorUrl = coordinatorUrl;
     this.containerExecutionId = containerExecutionId;
     this.coordinatorStreamStore = coordinatorStreamStore;
-    this.isApplicationMasterHighAvailabilityEnabled = isApplicationMasterHighAvailabilityEnabled;
-    this.retryCount = retryCount;
-    this.sleepDurationForReconnectWithAM = sleepDurationForReconnectWithAM;
+
+    JobConfig jobConfig = new JobConfig(config);
+    this.isApplicationMasterHighAvailabilityEnabled = jobConfig.getApplicationMasterHighAvailabilityEnabled();
+    this.retryCount = jobConfig.getContainerHeartbeatRetryCount();
+    this.sleepDurationForReconnectWithAM = jobConfig.getContainerHeartbeatRetrySleepDurationMs();
+
+    initializeMetrics(config);
   }
 
   public void start() {
@@ -91,6 +106,7 @@ public class ContainerHeartbeatMonitor {
     scheduler.scheduleAtFixedRate(() -> {
       ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat();
       if (!response.isAlive()) {
+        metrics.incrementHeartbeatExpiredCount();
         if (isApplicationMasterHighAvailabilityEnabled) {
           LOG.warn("Failed to establish connection with {}. Checking for new AM", coordinatorUrl);
           try {
@@ -100,7 +116,8 @@ public class ContainerHeartbeatMonitor {
           } catch (Exception e) {
             // On exception in re-establish connection with new AM, force exit.
             LOG.error("Exception trying to connect with new AM", e);
-            forceExit("failure in establishing cconnection with new AM", 0);
+            metrics.incrementHeartbeatEstablishedFailureCount();
+            forceExit("failure in establishing connection with new AM", SHUTDOWN_TIMOUT_MS);
             return;
           }
         }
@@ -115,6 +132,7 @@ public class ContainerHeartbeatMonitor {
     if (started) {
       LOG.info("Stopping ContainerHeartbeatMonitor");
       scheduler.shutdown();
+      reporters.values().forEach(MetricsReporter::stop);
     }
   }
 
@@ -122,6 +140,7 @@ public class ContainerHeartbeatMonitor {
     boolean response = false;
     int attempt = 1;
 
+    long startTime = System.currentTimeMillis();
     while (attempt <= retryCount) {
       String newCoordinatorUrl = SERDE.fromBytes(coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL));
       try {
@@ -142,16 +161,40 @@ public class ContainerHeartbeatMonitor {
       }
       attempt++;
     }
+
+    metrics.setHeartbeatDiscoveryTime(System.currentTimeMillis() - startTime);
+    if (response) {
+      metrics.incrementHeartbeatEstablishedWithNewAmCount();
+    } else {
+      metrics.incrementHeartbeatEstablishedFailureCount();
+    }
+
     return response;
   }
 
+  private void initializeMetrics(Config config) {
+    reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), SOURCE_NAME);
+    MetricsRegistryMap registryMap = new MetricsRegistryMap();
+    metrics = new ContainerHeartbeatMetrics(registryMap);
+    reporters.values().forEach(reporter -> reporter.register(SOURCE_NAME, registryMap));
+  }
+
   @VisibleForTesting
   ContainerHeartbeatClient createContainerHeartbeatClient(String coordinatorUrl, String containerExecutionId) {
     return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
   }
 
+  @VisibleForTesting
+  ContainerHeartbeatMetrics getMetrics() {
+    return metrics;
+  }
+
   private void forceExit(String message, int timeout) {
     scheduler.schedule(() -> {
+      if (started) {
+        reporters.values().forEach(MetricsReporter::stop);
+      }
+
       LOG.error(message);
       ThreadUtil.logThreadDump("Thread dump at heartbeat monitor: due to " + message);
       System.exit(1);
@@ -170,4 +213,55 @@ public class ContainerHeartbeatMonitor {
       return t;
     }
   }
+
+  static final class ContainerHeartbeatMetrics {
+    private static final String GROUP = "ContainerHeartbeatMonitor";
+    private static final String HEARTBEAT_DISCOVERY_TIME_MS = "heartbeat-discovery-time-ms";
+    private static final String HEARTBEAT_ESTABLISHED_FAILURE_COUNT = "heartbeat-established-failure-count";
+    private static final String HEARTBEAT_ESTABLISHED_WITH_NEW_AM_COUNT = "heartbeat-established-with-new-am-count";
+    private static final String HEARTBEAT_EXPIRED_COUNT = "heartbeat-expired-count";
+
+    private final Counter heartbeatEstablishedFailureCount;
+    private final Counter heartbeatEstablishedWithNewAmCount;
+    private final Counter heartbeatExpiredCount;
+    private final Gauge<Long> heartbeatDiscoveryTime;
+
+    public ContainerHeartbeatMetrics(MetricsRegistry registry) {
+      heartbeatEstablishedFailureCount = registry.newCounter(GROUP, HEARTBEAT_ESTABLISHED_FAILURE_COUNT);
+      heartbeatEstablishedWithNewAmCount = registry.newCounter(GROUP, HEARTBEAT_ESTABLISHED_WITH_NEW_AM_COUNT);
+      heartbeatExpiredCount = registry.newCounter(GROUP, HEARTBEAT_EXPIRED_COUNT);
+      heartbeatDiscoveryTime = registry.newGauge(GROUP, HEARTBEAT_DISCOVERY_TIME_MS, 0L);
+    }
+
+    @VisibleForTesting
+    Counter getHeartbeatEstablishedFailureCount() {
+      return heartbeatEstablishedFailureCount;
+    }
+
+    @VisibleForTesting
+    Counter getHeartbeatEstablishedWithNewAmCount() {
+      return heartbeatEstablishedWithNewAmCount;
+    }
+
+    @VisibleForTesting
+    Counter getHeartbeatExpiredCount() {
+      return heartbeatExpiredCount;
+    }
+
+    private void incrementHeartbeatEstablishedFailureCount() {
+      heartbeatEstablishedFailureCount.inc();
+    }
+
+    private void incrementHeartbeatEstablishedWithNewAmCount() {
+      heartbeatEstablishedWithNewAmCount.inc();
+    }
+
+    private void incrementHeartbeatExpiredCount() {
+      heartbeatExpiredCount.inc();
+    }
+
+    private void setHeartbeatDiscoveryTime(long timeInMillis) {
+      heartbeatDiscoveryTime.set(timeInMillis);
+    }
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
index c4540a6..cbc9014 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
@@ -50,25 +50,11 @@ import org.slf4j.LoggerFactory;
  */
 public class JobCoordinatorMetadataManager {
   private static final Logger LOG = LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
-  private static final String APPLICATION_ATTEMPT_COUNT = "applicationAttemptCount";
-  private static final String JOB_COORDINATOR_MANAGER_METRICS = "job-coordinator-manager";
-  private static final String JOB_MODEL_CHANGED = "jobModelChanged";
-  private static final String CONFIG_CHANGED = "configChanged";
-  private static final String METADATA_GENERATION_FAILED_COUNT = "metadataGenerationFailedCount";
-  private static final String METADATA_READ_FAILED_COUNT = "metadataReadFailedCount";
-  private static final String METADATA_WRITE_FAILED_COUNT = "metadataWriteFailedCount";
-  private static final String NEW_DEPLOYMENT = "newDeployment";
 
   static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
   static final String CONTAINER_ID_DELIMITER = "_";
 
-  private final Counter applicationAttemptCount;
-  private final Counter metadataGenerationFailedCount;
-  private final Counter metadataReadFailedCount;
-  private final Counter metadataWriteFailedCount;
-  private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
-  private final Gauge<Integer> configChangedAcrossApplicationAttempt;
-  private final Gauge<Integer> newDeployment;
+  private final JobCoordinatorMetadataManagerMetrics metrics;
   private final MetadataStore metadataStore;
   private final ObjectMapper metadataMapper = SamzaObjectMapper.getObjectMapper();
   private final Serde<String> valueSerde;
@@ -88,17 +74,7 @@ public class JobCoordinatorMetadataManager {
     this.clusterType = clusterType;
     this.metadataStore = metadataStore;
     this.valueSerde = valueSerde;
-
-    applicationAttemptCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, APPLICATION_ATTEMPT_COUNT);
-    configChangedAcrossApplicationAttempt =
-        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, CONFIG_CHANGED, 0);
-    jobModelChangedAcrossApplicationAttempt =
-        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, JOB_MODEL_CHANGED, 0);
-    metadataGenerationFailedCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS,
-        METADATA_GENERATION_FAILED_COUNT);
-    metadataReadFailedCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, METADATA_READ_FAILED_COUNT);
-    metadataWriteFailedCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, METADATA_WRITE_FAILED_COUNT);
-    newDeployment = metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, NEW_DEPLOYMENT, 0);
+    this.metrics = new JobCoordinatorMetadataManagerMetrics(metricsRegistry);
   }
 
   /**
@@ -140,7 +116,7 @@ public class JobCoordinatorMetadataManager {
       return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), String.valueOf(configId),
           String.valueOf(jobModelId));
     } catch (Exception e) {
-      metadataGenerationFailedCount.inc();
+      metrics.incrementMetadataGenerationFailedCount();
       LOG.error("Failed to generate metadata for the current attempt due to ", e);
       throw new SamzaException("Failed to generate the metadata for the current attempt due to ", e);
     }
@@ -163,16 +139,16 @@ public class JobCoordinatorMetadataManager {
     boolean changed = true;
 
     if (previousMetadata == null) {
-      newDeployment.set(1);
+      metrics.setNewDeployment(1);
     } else if (!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
-      newDeployment.set(1);
+      metrics.setNewDeployment(1);
     } else if (!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
-      jobModelChangedAcrossApplicationAttempt.set(1);
+      metrics.setJobModelChangedAcrossApplicationAttempt(1);
     } else if (!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
-      configChangedAcrossApplicationAttempt.set(1);
+      metrics.setConfigChangedAcrossApplicationAttempt(1);
     } else {
       changed = false;
-      applicationAttemptCount.inc();
+      metrics.incrementApplicationAttemptCount();
     }
 
     if (changed) {
@@ -199,7 +175,7 @@ public class JobCoordinatorMetadataManager {
           metadata = metadataMapper.readValue(metadataString, JobCoordinatorMetadata.class);
           break;
         } catch (Exception e) {
-          metadataReadFailedCount.inc();
+          metrics.incrementMetadataReadFailedCount();
           LOG.error("Failed to read job coordinator metadata due to ", e);
         }
       }
@@ -225,7 +201,7 @@ public class JobCoordinatorMetadataManager {
       metadataStore.put(clusterType.name(), valueSerde.toBytes(metadataValueString));
       LOG.info("Successfully written job coordinator metadata: {} for cluster {}.", metadata, clusterType);
     } catch (Exception e) {
-      metadataWriteFailedCount.inc();
+      metrics.incrementMetadataWriteFailedCount();
       LOG.error("Failed to write the job coordinator metadata to metadata store due to ", e);
       throw new SamzaException("Failed to write the job coordinator metadata.", e);
     }
@@ -258,43 +234,13 @@ public class JobCoordinatorMetadataManager {
   }
 
   @VisibleForTesting
-  Counter getApplicationAttemptCount() {
-    return applicationAttemptCount;
-  }
-
-  @VisibleForTesting
-  Counter getMetadataGenerationFailedCount() {
-    return metadataGenerationFailedCount;
-  }
-
-  @VisibleForTesting
-  Counter getMetadataReadFailedCount() {
-    return metadataReadFailedCount;
-  }
-
-  @VisibleForTesting
-  Counter getMetadataWriteFailedCount() {
-    return metadataWriteFailedCount;
-  }
-
-  @VisibleForTesting
-  Gauge<Integer> getJobModelChangedAcrossApplicationAttempt() {
-    return jobModelChangedAcrossApplicationAttempt;
-  }
-
-  @VisibleForTesting
-  Gauge<Integer> getConfigChangedAcrossApplicationAttempt() {
-    return configChangedAcrossApplicationAttempt;
-  }
-
-  @VisibleForTesting
-  Gauge<Integer> getNewDeployment() {
-    return newDeployment;
+  String getEnvProperty(String propertyName) {
+    return System.getenv(propertyName);
   }
 
   @VisibleForTesting
-  String getEnvProperty(String propertyName) {
-    return System.getenv(propertyName);
+  JobCoordinatorMetadataManagerMetrics getMetrics() {
+    return metrics;
   }
 
   /**
@@ -324,4 +270,102 @@ public class JobCoordinatorMetadataManager {
   public enum ClusterType {
     YARN
   }
+
+  /**
+   * A container class to hold all the metrics related to {@link JobCoordinatorMetadataManager}.
+   */
+  static class JobCoordinatorMetadataManagerMetrics {
+    private static final String APPLICATION_ATTEMPT_COUNT = "application-attempt-count";
+    private static final String GROUP = "JobCoordinatorMetadataManager";
+    private static final String JOB_MODEL_CHANGED = "job-model-changed";
+    private static final String CONFIG_CHANGED = "config-changed";
+    private static final String METADATA_GENERATION_FAILED_COUNT = "metadata-generation-failed-count";
+    private static final String METADATA_READ_FAILED_COUNT = "metadata-read-failed-count";
+    private static final String METADATA_WRITE_FAILED_COUNT = "metadata-write-failed-count";
+    private static final String NEW_DEPLOYMENT = "new-deployment";
+
+    private final Counter applicationAttemptCount;
+    private final Counter metadataGenerationFailedCount;
+    private final Counter metadataReadFailedCount;
+    private final Counter metadataWriteFailedCount;
+    private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
+    private final Gauge<Integer> configChangedAcrossApplicationAttempt;
+    private final Gauge<Integer> newDeployment;
+
+    public JobCoordinatorMetadataManagerMetrics(MetricsRegistry registry) {
+      applicationAttemptCount = registry.newCounter(GROUP, APPLICATION_ATTEMPT_COUNT);
+      configChangedAcrossApplicationAttempt =
+          registry.newGauge(GROUP, CONFIG_CHANGED, 0);
+      jobModelChangedAcrossApplicationAttempt =
+          registry.newGauge(GROUP, JOB_MODEL_CHANGED, 0);
+      metadataGenerationFailedCount = registry.newCounter(GROUP,
+          METADATA_GENERATION_FAILED_COUNT);
+      metadataReadFailedCount = registry.newCounter(GROUP, METADATA_READ_FAILED_COUNT);
+      metadataWriteFailedCount = registry.newCounter(GROUP, METADATA_WRITE_FAILED_COUNT);
+      newDeployment = registry.newGauge(GROUP, NEW_DEPLOYMENT, 0);
+    }
+
+    @VisibleForTesting
+    Counter getApplicationAttemptCount() {
+      return applicationAttemptCount;
+    }
+
+    @VisibleForTesting
+    Counter getMetadataGenerationFailedCount() {
+      return metadataGenerationFailedCount;
+    }
+
+    @VisibleForTesting
+    Counter getMetadataReadFailedCount() {
+      return metadataReadFailedCount;
+    }
+
+    @VisibleForTesting
+    Counter getMetadataWriteFailedCount() {
+      return metadataWriteFailedCount;
+    }
+
+    @VisibleForTesting
+    Gauge<Integer> getJobModelChangedAcrossApplicationAttempt() {
+      return jobModelChangedAcrossApplicationAttempt;
+    }
+
+    @VisibleForTesting
+    Gauge<Integer> getConfigChangedAcrossApplicationAttempt() {
+      return configChangedAcrossApplicationAttempt;
+    }
+
+    @VisibleForTesting
+    Gauge<Integer> getNewDeployment() {
+      return newDeployment;
+    }
+
+    void incrementApplicationAttemptCount() {
+      applicationAttemptCount.inc();
+    }
+
+    void incrementMetadataGenerationFailedCount() {
+      metadataGenerationFailedCount.inc();
+    }
+
+    void incrementMetadataReadFailedCount() {
+      metadataReadFailedCount.inc();
+    }
+
+    void incrementMetadataWriteFailedCount() {
+      metadataWriteFailedCount.inc();
+    }
+
+    void setConfigChangedAcrossApplicationAttempt(int value) {
+      configChangedAcrossApplicationAttempt.set(value);
+    }
+
+    void setJobModelChangedAcrossApplicationAttempt(int value) {
+      jobModelChangedAcrossApplicationAttempt.set(value);
+    }
+
+    void setNewDeployment(int value) {
+      newDeployment.set(value);
+    }
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 459ad89..8e9f83d 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -145,12 +145,8 @@ public class ContainerLaunchUtil {
           listener = new ClusterBasedProcessorLifecycleListener(config, processorLifecycleListener, container::shutdown);
       container.setContainerListener(listener);
 
-      JobConfig jobConfig = new JobConfig(config);
-      ContainerHeartbeatMonitor heartbeatMonitor =
-          createContainerHeartbeatMonitor(container,
-              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE),
-              jobConfig.getApplicationMasterHighAvailabilityEnabled(), jobConfig.getContainerHeartbeatRetryCount(),
-              jobConfig.getContainerHeartbeatRetrySleepDurationMs());
+      ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container,
+          new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE), config);
       if (heartbeatMonitor != null) {
         heartbeatMonitor.start();
       }
@@ -206,14 +202,11 @@ public class ContainerLaunchUtil {
    * Creates a new container heartbeat monitor if possible.
    * @param container the container to monitor
    * @param coordinatorStreamStore the metadata store to fetch coordinator url from
-   * @param isApplicaitonMasterHighAvailabilityEnabled whether AM HA is enabled to fetch new AM url
-   * @param retryCount number of times to retry connecting to new AM when heartbeat expires
-   * @param sleepDurationForReconnectWithAM sleep duration between retries to connect to new AM when heartbeat expires
+   * @param config the job configuration
    * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
    */
   private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container,
-      MetadataStore coordinatorStreamStore, boolean isApplicaitonMasterHighAvailabilityEnabled, long retryCount,
-      long sleepDurationForReconnectWithAM) {
+      MetadataStore coordinatorStreamStore, Config config) {
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
     String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
     if (executionEnvContainerId != null) {
@@ -226,8 +219,7 @@ public class ContainerLaunchUtil {
           log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
           System.exit(1);
         }
-      }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore, isApplicaitonMasterHighAvailabilityEnabled,
-          retryCount, sleepDurationForReconnectWithAM);
+      }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore, config);
     } else {
       log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
       return null;
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index 5c0bf16..61096e5 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -19,9 +19,13 @@
 
 package org.apache.samza.container;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
@@ -31,6 +35,7 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
@@ -43,6 +48,11 @@ import static org.mockito.Mockito.when;
 
 
 public class TestContainerHeartbeatMonitor {
+  private static final String COORDINATOR_URL = "http://some-host.prod.linkedin.com";
+  private static final ContainerHeartbeatResponse FAILURE_RESPONSE = new ContainerHeartbeatResponse(false);
+  private static final ContainerHeartbeatResponse SUCCESS_RESPONSE = new ContainerHeartbeatResponse(true);
+  private static final String CONTAINER_EXECUTION_ID = "0";
+
   @Mock
   private Runnable onExpired;
   @Mock
@@ -58,18 +68,22 @@ public class TestContainerHeartbeatMonitor {
 
   private ContainerHeartbeatMonitor containerHeartbeatMonitor;
 
-  private static final String COORDINATOR_URL = "http://some-host.prod.linkedin.com";
-  private static final ContainerHeartbeatResponse FAILURE_RESPONSE = new ContainerHeartbeatResponse(false);
-  private static final ContainerHeartbeatResponse SUCCESS_RESPONSE = new ContainerHeartbeatResponse(true);
-
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
     this.schedulerFixedRateExecutionLatch = new CountDownLatch(1);
     this.scheduler = buildScheduledExecutorService(this.schedulerFixedRateExecutionLatch);
-    this.containerHeartbeatMonitor =
-        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
-            "0", coordinatorStreamStore, false, 5, 10);
+    this.containerHeartbeatMonitor = buildContainerHeartbeatMonitor(false);
+  }
+
+  private ContainerHeartbeatMonitor buildContainerHeartbeatMonitor(boolean enableAMHighAvailability) {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.YARN_AM_HIGH_AVAILABILITY_ENABLED, String.valueOf(enableAMHighAvailability));
+    configMap.put(JobConfig.YARN_CONTAINER_HEARTBEAT_RETRY_COUNT, "5");
+    configMap.put(JobConfig.YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS, "10");
+
+    return new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+        CONTAINER_EXECUTION_ID, coordinatorStreamStore, new MapConfig(configMap));
   }
 
   @Test
@@ -81,6 +95,8 @@ public class TestContainerHeartbeatMonitor {
     boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
     assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
     // check that the shutdown task got submitted, but don't actually execute it since it will shut down the process
+    assertEquals("Heartbeat expired count should be 1", 1,
+        containerHeartbeatMonitor.getMetrics().getHeartbeatExpiredCount().getCount());
     verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
         eq(TimeUnit.MILLISECONDS));
     verify(this.onExpired).run();
@@ -97,6 +113,8 @@ public class TestContainerHeartbeatMonitor {
     // wait for the executor to finish the heartbeat check task
     boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
     assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    assertEquals("Heartbeat expired count should be 0", 0,
+        containerHeartbeatMonitor.getMetrics().getHeartbeatExpiredCount().getCount());
     // shutdown task should not have been submitted
     verify(this.scheduler, never()).schedule(any(Runnable.class), anyLong(), any());
     verify(this.onExpired, never()).run();
@@ -107,20 +125,27 @@ public class TestContainerHeartbeatMonitor {
 
   @Test
   public void testReestablishConnectionWithNewAM() throws InterruptedException {
-    String containerExecutionId = "0";
     String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
-    this.containerHeartbeatMonitor =
-        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
-            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    this.containerHeartbeatMonitor = spy(buildContainerHeartbeatMonitor(true));
     CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatMonitor.ContainerHeartbeatMetrics metrics = this.containerHeartbeatMonitor.getMetrics();
+
     when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE).thenReturn(SUCCESS_RESPONSE);
-    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
-    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+    when(this.containerHeartbeatMonitor
+        .createContainerHeartbeatClient(newCoordinatorUrl, CONTAINER_EXECUTION_ID))
+        .thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL))
+        .thenReturn(serde.toBytes(newCoordinatorUrl));
 
     this.containerHeartbeatMonitor.start();
     // wait for the executor to finish the heartbeat check task
     boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
     assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    assertEquals("Heartbeat expired count should be 1", 1, metrics.getHeartbeatExpiredCount().getCount());
+    assertEquals("Heartbeat established failure count should be 0", 0,
+        metrics.getHeartbeatEstablishedFailureCount().getCount());
+    assertEquals("Heartbeat established with new AM should be 1", 1,
+        metrics.getHeartbeatEstablishedWithNewAmCount().getCount());
     // shutdown task should not have been submitted
     verify(this.scheduler, never()).schedule(any(Runnable.class), anyLong(), any());
     verify(this.onExpired, never()).run();
@@ -131,16 +156,22 @@ public class TestContainerHeartbeatMonitor {
 
   @Test
   public void testFailedToFetchNewAMCoordinatorUrl() throws InterruptedException {
-    this.containerHeartbeatMonitor =
-        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
-            "0", coordinatorStreamStore, true, 5, 10));
+    this.containerHeartbeatMonitor = spy(buildContainerHeartbeatMonitor(true));
     CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatMonitor.ContainerHeartbeatMetrics metrics = this.containerHeartbeatMonitor.getMetrics();
+
     when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
-    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(COORDINATOR_URL));
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL))
+        .thenReturn(serde.toBytes(COORDINATOR_URL));
+
     this.containerHeartbeatMonitor.start();
     // wait for the executor to finish the heartbeat check task
     boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+
     assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    assertEquals("Heartbeat expired count should be 1", 1, metrics.getHeartbeatExpiredCount().getCount());
+    assertEquals("Heartbeat established failure count should be 1", 1,
+        metrics.getHeartbeatEstablishedFailureCount().getCount());
     // shutdown task should have been submitted
     verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
         eq(TimeUnit.MILLISECONDS));
@@ -152,20 +183,25 @@ public class TestContainerHeartbeatMonitor {
 
   @Test
   public void testConnectToNewAMFailed() throws InterruptedException {
-    String containerExecutionId = "0";
     String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
-    this.containerHeartbeatMonitor =
-        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
-            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    this.containerHeartbeatMonitor = spy(buildContainerHeartbeatMonitor(true));
     CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatMonitor.ContainerHeartbeatMetrics metrics = this.containerHeartbeatMonitor.getMetrics();
+
     when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
-    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
-    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, CONTAINER_EXECUTION_ID))
+        .thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL))
+        .thenReturn(serde.toBytes(newCoordinatorUrl));
 
     this.containerHeartbeatMonitor.start();
     // wait for the executor to finish the heartbeat check task
     boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+
     assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    assertEquals("Heartbeat expired count should be 1", 1, metrics.getHeartbeatExpiredCount().getCount());
+    assertEquals("Heartbeat established failure count should be 1", 1,
+        metrics.getHeartbeatEstablishedFailureCount().getCount());
     // shutdown task should have been submitted
     verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
         eq(TimeUnit.MILLISECONDS));
@@ -177,22 +213,28 @@ public class TestContainerHeartbeatMonitor {
 
   @Test
   public void testConnectToNewAMSerdeException() throws InterruptedException {
-    String containerExecutionId = "0";
     String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
-    this.containerHeartbeatMonitor =
-        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
-            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    this.containerHeartbeatMonitor = spy(buildContainerHeartbeatMonitor(true));
     CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatMonitor.ContainerHeartbeatMetrics metrics = this.containerHeartbeatMonitor.getMetrics();
+
     when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
-    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
-    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenThrow(new NullPointerException("serde failed"));
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, CONTAINER_EXECUTION_ID))
+        .thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL))
+        .thenThrow(new NullPointerException("serde failed"));
 
     this.containerHeartbeatMonitor.start();
     // wait for the executor to finish the heartbeat check task
-    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(10, TimeUnit.SECONDS);
+
     assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    assertEquals("Heartbeat expired count should be 1", 1, metrics.getHeartbeatExpiredCount().getCount());
+    assertEquals("Heartbeat established failure count should be 1", 1,
+        metrics.getHeartbeatEstablishedFailureCount().getCount());
     // shutdown task should have been submitted
-    verify(this.scheduler).schedule(any(Runnable.class), eq(0L), eq(TimeUnit.MILLISECONDS));
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
     verify(this.onExpired).run();
 
     this.containerHeartbeatMonitor.stop();
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
index 70e65a3..d623aed 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
@@ -113,11 +113,14 @@ public class TestJobCoordinatorMetadataManager {
     JobCoordinatorMetadata newMetadataWithDifferentEpochId =
         new JobCoordinatorMetadata(NEW_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
 
+    JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics metrics =
+        jobCoordinatorMetadataManager.getMetrics();
+
     boolean metadataChanged =
         jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithDifferentEpochId);
     assertTrue("Metadata check should return true", metadataChanged);
     assertEquals("New deployment should be 1 since Epoch ID changed", 1,
-        jobCoordinatorMetadataManager.getNewDeployment().getValue().intValue());
+        metrics.getNewDeployment().getValue().intValue());
 
     JobCoordinatorMetadata newMetadataWithDifferentConfigId =
         new JobCoordinatorMetadata(OLD_EPOCH_ID, NEW_CONFIG_ID, OLD_JOB_MODEL_ID);
@@ -125,7 +128,7 @@ public class TestJobCoordinatorMetadataManager {
         jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithDifferentConfigId);
     assertTrue("Metadata check should return true", metadataChanged);
     assertEquals("Config across application attempts should be 1", 1,
-        jobCoordinatorMetadataManager.getConfigChangedAcrossApplicationAttempt().getValue().intValue());
+        metrics.getConfigChangedAcrossApplicationAttempt().getValue().intValue());
 
     JobCoordinatorMetadata newMetadataWithDifferentJobModelId =
         new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, NEW_JOB_MODEL_ID);
@@ -133,18 +136,18 @@ public class TestJobCoordinatorMetadataManager {
         jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithDifferentJobModelId);
     assertTrue("Metadata check should return true", metadataChanged);
     assertEquals("Job model changed across application attempts should be 1", 1,
-        jobCoordinatorMetadataManager.getJobModelChangedAcrossApplicationAttempt().getValue().intValue());
+        metrics.getJobModelChangedAcrossApplicationAttempt().getValue().intValue());
 
     JobCoordinatorMetadata newMetadataWithNoChange =
         new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
     assertEquals("Application attempt count should be 0", 0,
-        jobCoordinatorMetadataManager.getApplicationAttemptCount().getCount());
+        metrics.getApplicationAttemptCount().getCount());
 
     metadataChanged =
         jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithNoChange);
     assertFalse("Metadata check should return false", metadataChanged);
     assertEquals("Application attempt count should be 1", 1,
-        jobCoordinatorMetadataManager.getApplicationAttemptCount().getCount());
+        metrics.getApplicationAttemptCount().getCount());
   }
 
   @Test
@@ -158,7 +161,7 @@ public class TestJobCoordinatorMetadataManager {
     } catch (Exception e) {
       assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
       assertEquals("Metadata generation failed count should be 1", 1,
-          jobCoordinatorMetadataManager.getMetadataGenerationFailedCount().getCount());
+          jobCoordinatorMetadataManager.getMetrics().getMetadataGenerationFailedCount().getCount());
     }
   }
 
@@ -208,7 +211,7 @@ public class TestJobCoordinatorMetadataManager {
     JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
     assertNull("Read failed should return null", actualMetadata);
     assertEquals("Metadata read failed count should be 1", 1,
-        jobCoordinatorMetadataManager.getMetadataReadFailedCount().getCount());
+        jobCoordinatorMetadataManager.getMetrics().getMetadataReadFailedCount().getCount());
   }
 
   @Test
@@ -237,7 +240,7 @@ public class TestJobCoordinatorMetadataManager {
     } catch (Exception e) {
       assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
       assertEquals("Metadata write failed count should be 1", 1,
-          jobCoordinatorMetadataManager.getMetadataWriteFailedCount().getCount());
+          jobCoordinatorMetadataManager.getMetrics().getMetadataWriteFailedCount().getCount());
     }
   }
 }
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index ccdd00f..c07a45b 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -220,6 +220,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     nmClientAsync.init(yarnConfiguration);
     nmClientAsync.start();
     Set<ContainerId> previousAttemptsContainers = lifecycle.onInit();
+    metrics.setContainersFromPreviousAttempts(previousAttemptsContainers.size());
 
     if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
       log.info("Received running containers from previous attempt. Invoking launch success for them.");
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index 712c985..b85e3c5 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -23,8 +23,7 @@ import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.{Config, MetricsConfig}
 import org.apache.samza.util.Logging
 import org.apache.samza.util.MetricsReporterLoader
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.{Counter, MetricsHelper, ReadableMetricsRegistry}
 
 import scala.collection.JavaConverters._
 
@@ -42,9 +41,14 @@ class SamzaAppMasterMetrics(val config: Config,
   val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging {
 
   private val metricsConfig = new MetricsConfig(config)
+  val containersFromPreviousAttempts = newCounter("container-from-previous-attempt")
   val reporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, SamzaAppMasterMetrics.sourceName).asScala
   reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName, registry))
 
+  def setContainersFromPreviousAttempts(containerCount: Int) {
+    containersFromPreviousAttempts.inc(containerCount)
+  }
+
   def start() {
     val mRunningContainers = newGauge("running-containers", () => state.runningProcessors.size)
     val mNeededContainers = newGauge("needed-containers", () => state.neededProcessors.get())
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index 89929f7..c7ae46f 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -201,8 +201,11 @@ public class TestYarnClusterResourceManager {
     verify(lifecycle).onInit();
     ArgumentCaptor<SamzaResource> samzaResourceArgumentCaptor = ArgumentCaptor.forClass(SamzaResource.class);
     verify(callback).onStreamProcessorLaunchSuccess(samzaResourceArgumentCaptor.capture());
+    ArgumentCaptor<Integer> containerFromPreviousAttemptCaptor = ArgumentCaptor.forClass(Integer.class);
+    verify(metrics).setContainersFromPreviousAttempts(containerFromPreviousAttemptCaptor.capture());
     SamzaResource samzaResource = samzaResourceArgumentCaptor.getValue();
     assertEquals(previousAttemptYarnContainerId, samzaResource.getContainerId());
+    assertEquals(1, containerFromPreviousAttemptCaptor.getValue().intValue());
   }
 
   @Test
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
index 338b455..3090d70 100644
--- a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
@@ -108,7 +108,7 @@ public class TestApplicationMasterRestClient {
     assertTrue(metricsResult.containsKey(group));
 
     Map<String, Object> amMetricsGroup = metricsResult.get(group);
-    assertEquals(7, amMetricsGroup.size());
+    assertEquals(8, amMetricsGroup.size());
     assertEquals(samzaAppState.runningProcessors.size(),  amMetricsGroup.get("running-containers"));
     assertEquals(samzaAppState.neededProcessors.get(),    amMetricsGroup.get("needed-containers"));
     assertEquals(samzaAppState.completedProcessors.get(), amMetricsGroup.get("completed-containers"));
@@ -116,6 +116,7 @@ public class TestApplicationMasterRestClient {
     assertEquals(samzaAppState.releasedContainers.get(),  amMetricsGroup.get("released-containers"));
     assertEquals(samzaAppState.processorCount.get(),      amMetricsGroup.get("container-count"));
     assertEquals(samzaAppState.jobHealthy.get() ? 1 : 0,  amMetricsGroup.get("job-healthy"));
+    assertEquals(0, amMetricsGroup.get("container-from-previous-attempt"));
   }
 
   @Test