You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/11/25 02:54:34 UTC

[samza] branch master updated: SAMZA-2602: Dynamic heartbeat establish with new AM (#1442)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fd456a  SAMZA-2602: Dynamic heartbeat establish with new AM	 (#1442)
3fd456a is described below

commit 3fd456a0554d1c77a70a630b7aa73929f508a045
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Tue Nov 24 18:53:50 2020 -0800

    SAMZA-2602: Dynamic heartbeat establish with new AM	 (#1442)
    
    - Introduce config for cluster based job coordinator high availability
    - Dynamic heartbeat establish with new ApplicationMaster
---
 .../versioned/jobs/samza-configurations.md         |  3 +
 .../java/org/apache/samza/config/JobConfig.java    | 28 ++++++++
 .../samza/container/ContainerHeartbeatMonitor.java | 71 ++++++++++++++++--
 .../apache/samza/runtime/ContainerLaunchUtil.java  | 21 ++++--
 .../container/TestContainerHeartbeatMonitor.java   | 83 +++++++++++++++++++++-
 5 files changed, 197 insertions(+), 9 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 932c4c8..1caf88e 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -324,6 +324,9 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |job.container.count|1|The number of YARN containers to request for running your job. This is the main parameter for controlling the scale (allocated computing resources) of your job: to increase the parallelism of processing, you need to increase the number of containers. The minimum is one container, and the maximum number of containers is the number of task instances (usually the number of input stream partitions). Task instances are evenly distributed across the number of containers  [...]
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to request from the cluster manager per container of your job. Along with cluster-manager.container.cpu.cores, this property determines how many containers the cluster manager will run on one machine. If the container exceeds this limit, it will be killed, so it is important that the container's actual memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with task.o [...]
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per container of your job. Each node in the cluster has a certain number of CPU cores available, so this number (along with cluster-manager.container.memory.mb) determines how many containers can be run on one machine.|
+|yarn.am.high-availability.enabled|false|If true, enables Job Coordinator (AM) high availability (HA) where a new AM can establish connection with already running containers.  
+|yarn.container.heartbeat.retry.count|5|If AM-HA is enabled, when a running container loses heartbeat with AM, this count gives the number of times an already running container will attempt to establish heartbeat with new AM|
+|yarn.container.heartbeat.retry-sleep-duration.ms|10000|If AM-HA is enabled, when a running container loses heartbeat with AM, this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.|
 
 ##### <a name="advanced-cluster-configurations"></a>[5.1.1 Advanced Cluster Configurations](#advanced-cluster-configurations)
 |Name|Default|Description|
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 0b274e9..021c67d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -147,6 +147,21 @@ public class JobConfig extends MapConfig {
 
   private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
 
+  // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability (AM-HA).
+  // High availability allows new AM to establish connection with already running containers
+  public static final String YARN_AM_HIGH_AVAILABILITY_ENABLED = "yarn.am.high-availability.enabled";
+  public static final boolean YARN_AM_HIGH_AVAILABILITY_ENABLED_DEFAULT = false;
+
+  // If AM-HA is enabled, when a running container loses heartbeat with AM,
+  // this count gives the number of times an already running container will attempt to establish heartbeat with new AM.
+  public static final String YARN_CONTAINER_HEARTBEAT_RETRY_COUNT = "yarn.container.heartbeat.retry.count";
+  public static final long YARN_CONTAINER_HEARTBEAT_RETRY_COUNT_DEFAULT = 5;
+
+  // If AM-HA is enabled, when a running container loses heartbeat with AM,
+  // this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.
+  public static final String YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS = "yarn.container.heartbeat.retry-sleep-duration.ms";
+  public static final long YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT = 10000;
+
   public JobConfig(Config config) {
     super(config);
   }
@@ -398,6 +413,19 @@ public class JobConfig extends MapConfig {
     return get(COORDINATOR_STREAM_FACTORY, DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY);
   }
 
+  public boolean getApplicationMasterHighAvailabilityEnabled() {
+    return getBoolean(YARN_AM_HIGH_AVAILABILITY_ENABLED, YARN_AM_HIGH_AVAILABILITY_ENABLED_DEFAULT);
+  }
+
+  public long getContainerHeartbeatRetryCount() {
+    return getLong(YARN_CONTAINER_HEARTBEAT_RETRY_COUNT, YARN_CONTAINER_HEARTBEAT_RETRY_COUNT_DEFAULT);
+  }
+
+  public long getContainerHeartbeatRetrySleepDurationMs() {
+    return getLong(YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS,
+        YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT);
+  }
+
   /**
    * Get config loader factory according to the configs
    * @return full qualified name of {@link ConfigLoaderFactory}
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 89b5fc9..7e20f26 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -26,6 +26,10 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.util.ThreadUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,26 +38,47 @@ import org.slf4j.LoggerFactory;
 public class ContainerHeartbeatMonitor {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
+  private static final CoordinatorStreamValueSerde SERDE = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+
   @VisibleForTesting
   static final int SCHEDULE_MS = 60000;
   @VisibleForTesting
   static final int SHUTDOWN_TIMOUT_MS = 120000;
 
   private final Runnable onContainerExpired;
-  private final ContainerHeartbeatClient containerHeartbeatClient;
   private final ScheduledExecutorService scheduler;
+  private final String containerExecutionId;
+  private final MetadataStore coordinatorStreamStore;
+  private final long sleepDurationForReconnectWithAM;
+  private final boolean isApplicationMasterHighAvailabilityEnabled;
+  private final long retryCount;
+
+  private ContainerHeartbeatClient containerHeartbeatClient;
+  private String coordinatorUrl;
   private boolean started = false;
 
-  public ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient) {
-    this(onContainerExpired, containerHeartbeatClient, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY));
+  public ContainerHeartbeatMonitor(Runnable onContainerExpired, String coordinatorUrl, String containerExecutionId,
+      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled, long retryCount,
+      long sleepDurationForReconnectWithAM) {
+    this(onContainerExpired, new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId),
+        Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), coordinatorUrl, containerExecutionId,
+        coordinatorStreamStore, isApplicationMasterHighAvailabilityEnabled, retryCount, sleepDurationForReconnectWithAM);
   }
 
   @VisibleForTesting
   ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient,
-      ScheduledExecutorService scheduler) {
+      ScheduledExecutorService scheduler, String coordinatorUrl, String containerExecutionId,
+      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled,
+      long retryCount, long sleepDurationForReconnectWithAM) {
     this.onContainerExpired = onContainerExpired;
     this.containerHeartbeatClient = containerHeartbeatClient;
     this.scheduler = scheduler;
+    this.coordinatorUrl = coordinatorUrl;
+    this.containerExecutionId = containerExecutionId;
+    this.coordinatorStreamStore = coordinatorStreamStore;
+    this.isApplicationMasterHighAvailabilityEnabled = isApplicationMasterHighAvailabilityEnabled;
+    this.retryCount = retryCount;
+    this.sleepDurationForReconnectWithAM = sleepDurationForReconnectWithAM;
   }
 
   public void start() {
@@ -65,6 +90,12 @@ public class ContainerHeartbeatMonitor {
     scheduler.scheduleAtFixedRate(() -> {
       ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat();
       if (!response.isAlive()) {
+        if (isApplicationMasterHighAvailabilityEnabled) {
+          LOG.warn("Failed to establish connection with {}. Checking for new AM", coordinatorUrl);
+          if (checkAndEstablishConnectionWithNewAM()) {
+            return;
+          }
+        }
         scheduler.schedule(() -> {
           // On timeout of container shutting down, force exit.
           LOG.error("Graceful shutdown timeout expired. Force exiting.");
@@ -84,6 +115,38 @@ public class ContainerHeartbeatMonitor {
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {
+    boolean response = false;
+    int attempt = 1;
+
+    while (attempt <= retryCount) {
+      String newCoordinatorUrl = SERDE.fromBytes(coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL));
+      try {
+        if (coordinatorUrl.equals(newCoordinatorUrl)) {
+          LOG.info("Attempt {} to discover new AM. Sleep for {}ms before next attempt.", attempt, sleepDurationForReconnectWithAM);
+          Thread.sleep(sleepDurationForReconnectWithAM);
+        } else {
+          LOG.info("Found new AM: {}. Establishing heartbeat with the new AM.", newCoordinatorUrl);
+          coordinatorUrl = newCoordinatorUrl;
+          containerHeartbeatClient = createContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
+          response = containerHeartbeatClient.requestHeartbeat().isAlive();
+          LOG.info("Received heartbeat response: {} from new AM: {}", response, this.coordinatorUrl);
+          break;
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted during sleep.");
+        Thread.currentThread().interrupt();
+      }
+      attempt++;
+    }
+    return response;
+  }
+
+  @VisibleForTesting
+  ContainerHeartbeatClient createContainerHeartbeatClient(String coordinatorUrl, String containerExecutionId) {
+    return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
+  }
+
   private static class HeartbeatThreadFactory implements ThreadFactory {
     private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-";
     private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index a5148fb..ec477c9 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -30,7 +30,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.SamzaContainer;
@@ -39,9 +38,11 @@ import org.apache.samza.context.ExternalContext;
 import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
@@ -142,7 +143,12 @@ public class ContainerLaunchUtil {
           listener = new ClusterBasedProcessorLifecycleListener(config, processorLifecycleListener, container::shutdown);
       container.setContainerListener(listener);
 
-      ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
+      JobConfig jobConfig = new JobConfig(config);
+      ContainerHeartbeatMonitor heartbeatMonitor =
+          createContainerHeartbeatMonitor(container,
+              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE),
+              jobConfig.getApplicationMasterHighAvailabilityEnabled(), jobConfig.getContainerHeartbeatRetryCount(),
+              jobConfig.getContainerHeartbeatRetrySleepDurationMs());
       if (heartbeatMonitor != null) {
         heartbeatMonitor.start();
       }
@@ -189,9 +195,15 @@ public class ContainerLaunchUtil {
   /**
    * Creates a new container heartbeat monitor if possible.
    * @param container the container to monitor
+   * @param coordinatorStreamStore the metadata store to fetch coordinator url from
+   * @param isApplicaitonMasterHighAvailabilityEnabled whether AM HA is enabled to fetch new AM url
+   * @param retryCount number of times to retry connecting to new AM when heartbeat expires
+   * @param sleepDurationForReconnectWithAM sleep duration between retries to connect to new AM when heartbeat expires
    * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
    */
-  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
+  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container,
+      MetadataStore coordinatorStreamStore, boolean isApplicaitonMasterHighAvailabilityEnabled, long retryCount,
+      long sleepDurationForReconnectWithAM) {
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
     String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
     if (executionEnvContainerId != null) {
@@ -204,7 +216,8 @@ public class ContainerLaunchUtil {
           log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
           System.exit(1);
         }
-      }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
+      }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore, isApplicaitonMasterHighAvailabilityEnabled,
+          retryCount, sleepDurationForReconnectWithAM);
     } else {
       log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
       return null;
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index 2445c00..ff297d1 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -22,6 +22,10 @@ package org.apache.samza.container;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -33,6 +37,7 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -42,6 +47,8 @@ public class TestContainerHeartbeatMonitor {
   private Runnable onExpired;
   @Mock
   private ContainerHeartbeatClient containerHeartbeatClient;
+  @Mock
+  private MetadataStore coordinatorStreamStore;
 
   private ScheduledExecutorService scheduler;
   /**
@@ -51,13 +58,18 @@ public class TestContainerHeartbeatMonitor {
 
   private ContainerHeartbeatMonitor containerHeartbeatMonitor;
 
+  private static final String COORDINATOR_URL = "http://some-host.prod.linkedin.com";
+  private static final ContainerHeartbeatResponse FAILURE_RESPONSE = new ContainerHeartbeatResponse(false);
+  private static final ContainerHeartbeatResponse SUCCESS_RESPONSE = new ContainerHeartbeatResponse(true);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
     this.schedulerFixedRateExecutionLatch = new CountDownLatch(1);
     this.scheduler = buildScheduledExecutorService(this.schedulerFixedRateExecutionLatch);
     this.containerHeartbeatMonitor =
-        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler);
+        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, false, 5, 10);
   }
 
   @Test
@@ -93,6 +105,75 @@ public class TestContainerHeartbeatMonitor {
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE).thenReturn(SUCCESS_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should not have been submitted
+    verify(this.scheduler, never()).schedule(any(Runnable.class), anyLong(), any());
+    verify(this.onExpired, never()).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  @Test
+  public void testFailedToFetchNewAMCoordinatorUrl() throws InterruptedException {
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(COORDINATOR_URL));
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  @Test
+  public void testConnectToNewAMFailed() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
   /**
    * Build a mock {@link ScheduledExecutorService} which will execute a fixed-rate task once. It will count down on
    * {@code schedulerFixedRateExecutionLatch} when the task is finished executing.