You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/03/04 20:35:09 UTC

[samza] branch master updated: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e2b9a76  SEP-19: Allocator changes for standby-aware container allocation, and active container failover
e2b9a76 is described below

commit e2b9a76fcba7488a4457907d3915c9a3cbf7cf51
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Mon Mar 4 12:35:05 2019 -0800

    SEP-19: Allocator changes for standby-aware container allocation, and active container failover
    
    This PR makes the following changes:
    
    * Adds a map called standbyContainerConstraints which stores standby constraints for each container in the job model. The logic for populating the map using the job model is added as Util class.
    
    * Adds a check before runStreamProcessor, where the standbyContainerConstraints is checked before launching, if the check succeeds container is launched. If standbyTasks are not enabled in config, existing behaviour is retained.
    
    * Adds logic to handle standbyContainerConstraints check failures, which entails releasing the resource, and making an any-host request for the container.
    
    * Adds logic in SamzaResourceRequest to order container request such that active-containers take precedence over standby, otherwise based on request-timestamp.
    
    * Adds logic in HostAwareContainerAllocator to trap resourceRequests issued by the CPM and the HACA, and
    a. if it is an any-host for an active container,  translate it for a stop a standby.
    b. if it is for a standby stopped by us, translate it for a resource request for active on standby-host and standby on anyhost.
    c. in all other cases, proceeds asis.
    
    * Adds metrics to capture FailedStandbyAllocations (due to standby constraints), successful standby allocations, number of failovers of active to a valid standby, number of failovers of active to anyhost, in case no standby was found, and number of standby-container-stops completed.
    
    Tested on dev setup, a VPC, and a test cluster.
    
    Author: Ray Matharu <rm...@linkedin.com>
    Author: rmatharu <40...@users.noreply.github.com>
    
    Reviewers: Jagadish Venkatraman <vj...@gmail.com>
    
    Closes #903 from rmatharu/test-basicStandbyFailover
---
 .../clustermanager/AbstractContainerAllocator.java |  12 +-
 .../clustermanager/ClusterResourceManager.java     |   8 +
 .../clustermanager/ContainerProcessManager.java    |  47 +-
 .../HostAwareContainerAllocator.java               |  33 +-
 .../samza/clustermanager/ResourceRequestState.java |  24 +-
 .../clustermanager/SamzaApplicationState.java      |  23 +-
 .../samza/clustermanager/SamzaResourceRequest.java |  12 +-
 .../clustermanager/StandbyContainerManager.java    | 516 +++++++++++++++++++++
 .../samza/clustermanager/StandbyTaskUtil.java      | 121 +++++
 .../grouper/task/TaskNameGrouperProxy.java         |  21 +-
 .../apache/samza/storage/StorageManagerUtil.java   |   9 +-
 .../metrics/ContainerProcessManagerMetrics.scala   |   4 +
 .../clustermanager/MockClusterResourceManager.java |   5 +
 .../clustermanager/MockContainerRequestState.java  |   4 +-
 .../MockHostAwareContainerAllocator.java           |   3 +-
 .../TestContainerProcessManager.java               |   3 +-
 .../TestHostAwareContainerAllocator.java           |   3 +-
 .../samza/clustermanager/TestStandbyAllocator.java | 126 +++++
 .../samza/job/yarn/YarnClusterResourceManager.java |  26 +-
 19 files changed, 943 insertions(+), 57 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 5547a32..7adb1cc 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -194,11 +194,19 @@ public abstract class AbstractContainerAllocator implements Runnable {
    * @param preferredHost Name of the host that you prefer to run the container on
    */
   public final void requestResource(String containerID, String preferredHost) {
-    SamzaResourceRequest request = new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
+    SamzaResourceRequest request = getResourceRequest(containerID, preferredHost);
+    issueResourceRequest(request);
+  }
+
+  public final SamzaResourceRequest getResourceRequest(String containerID, String preferredHost) {
+    return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
         preferredHost, containerID);
+  }
+
+  public final void issueResourceRequest(SamzaResourceRequest request) {
     resourceRequestState.addResourceRequest(request);
     state.containerRequests.incrementAndGet();
-    if (ResourceRequestState.ANY_HOST.equals(preferredHost)) {
+    if (ResourceRequestState.ANY_HOST.equals(request.getPreferredHost())) {
       state.anyHostRequests.incrementAndGet();
     } else {
       state.preferredHostRequests.incrementAndGet();
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index f8a8c8b..19f3ef0 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -121,6 +121,14 @@ public abstract class ClusterResourceManager {
    */
   public abstract void launchStreamProcessor(SamzaResource resource, CommandBuilder builder);
 
+  /**
+   * Requests the stopping of a StreamProcessor, identified by the given resource.
+   * {@link Callback#onResourcesCompleted(List)} will be invoked to indicate the completion of this operation.
+   *
+   * @param resource the resource being used for the StreamProcessor.
+   */
+  public abstract void stopStreamProcessor(SamzaResource resource);
+
 
   public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
 
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index a089ed9..e63b425 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.Optional;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -77,6 +78,9 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
   private final AbstractContainerAllocator containerAllocator;
   private final Thread allocatorThread;
 
+  // The StandbyContainerManager manages standby-aware allocation and failover of containers
+  private final Optional<StandbyContainerManager> standbyContainerManager;
+
   /**
    * A standard interface to request resources.
    */
@@ -115,6 +119,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
     this.containerAllocator = allocator;
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
+    this.standbyContainerManager = Optional.empty();
   }
 
   public ContainerProcessManager(Config config,
@@ -130,8 +135,14 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
     this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
 
+    if (jobConfig.getStandbyTasksEnabled()) {
+      this.standbyContainerManager = Optional.of(new StandbyContainerManager(state, clusterResourceManager));
+    } else {
+      this.standbyContainerManager = Optional.empty();
+    }
+
     if (this.hostAffinityEnabled) {
-      this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, state);
+      this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, standbyContainerManager, state);
     } else {
       this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
     }
@@ -155,10 +166,10 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
     this.clusterResourceManager = resourceManager;
     this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
-
+    this.standbyContainerManager = Optional.empty();
 
     if (this.hostAffinityEnabled) {
-      this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, state);
+      this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, this.standbyContainerManager, state);
     } else {
       this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
     }
@@ -185,14 +196,12 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     clusterResourceManager.start();
 
     log.info("Starting the Samza task manager");
-    final int containerCount = jobConfig.getContainerCount();
 
-    state.containerCount.set(containerCount);
-    state.neededContainers.set(containerCount);
+    state.containerCount.set(state.jobModelManager.jobModel().getContainers().size());
+    state.neededContainers.set(state.jobModelManager.jobModel().getContainers().size());
 
     // Request initial set of containers
     Map<String, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
-
     containerAllocator.requestResources(containerToHostMapping);
 
     // Start container allocator thread
@@ -297,8 +306,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         state.neededContainers.incrementAndGet();
         state.jobHealthy.set(false);
 
-          // request a container on new host
-        containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
+        // handle container stop due to node fail
+        this.handleContainerStop(containerId, containerStatus.getResourceID(), ResourceRequestState.ANY_HOST, exitStatus);
         break;
 
       default:
@@ -371,9 +380,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         }
 
         if (!tooManyFailedContainers) {
-          log.info("Requesting a new container ");
-          // Request a new container
-          containerAllocator.requestResource(containerId, lastSeenOn);
+          handleContainerStop(containerId, containerStatus.getResourceID(), lastSeenOn, exitStatus);
         }
 
     }
@@ -428,8 +435,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     String containerId = getPendingContainerId(resource.getResourceID());
     log.info("Failed container ID: {} for resourceId: {}", containerId, resource.getResourceID());
 
-    // 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host.
-    if (containerId != null) {
+    // 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host, if standby are not enabled
+    // otherwise calling standbyContainerManager
+    if (containerId != null && standbyContainerManager.isPresent()) {
+      this.standbyContainerManager.get().handleContainerLaunchFail(containerId, resource.getResourceID(), containerAllocator);
+    } else if (containerId != null) {
       log.info("Launch of container ID: {} failed on host: {}. Falling back to ANY_HOST", containerId, resource.getHost());
       containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
     } else {
@@ -484,5 +494,12 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     return null;
   }
 
-
+  private void handleContainerStop(String containerID, String resourceID, String preferredHost, int exitStatus) {
+    if (standbyContainerManager.isPresent()) {
+      standbyContainerManager.get().handleContainerStop(containerID, resourceID, preferredHost, exitStatus, containerAllocator);
+    } else {
+      // If StandbyTasks are not enabled, we simply make a request for the preferredHost
+      containerAllocator.requestResource(containerID, preferredHost);
+    }
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index 27d1caa..6bfa1a6 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -18,11 +18,14 @@
  */
 package org.apache.samza.clustermanager;
 
+
+import java.util.Optional;
 import java.util.Map;
 import org.apache.samza.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * This is the allocator thread that will be used by ContainerProcessManager when host-affinity is enabled for a job. It is similar
  * to {@link ContainerAllocator}, except that it considers locality for allocation.
@@ -45,11 +48,13 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
    * Tracks the expiration of a request for resources.
    */
   private final int requestTimeout;
+  private final Optional<StandbyContainerManager> standbyContainerManager;
 
   public HostAwareContainerAllocator(ClusterResourceManager manager,
-                                     int timeout, Config config, SamzaApplicationState state) {
+      int timeout, Config config, Optional<StandbyContainerManager> standbyContainerManager, SamzaApplicationState state) {
     super(manager, new ResourceRequestState(true, manager), config, state);
     this.requestTimeout = timeout;
+    this.standbyContainerManager = standbyContainerManager;
   }
 
   /**
@@ -70,7 +75,8 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
       if (hasAllocatedResource(preferredHost)) {
         // Found allocated container at preferredHost
         log.info("Found a matched-container {} on the preferred host. Running on {}", containerID, preferredHost);
-        runStreamProcessor(request, preferredHost);
+        // Try to launch streamProcessor on this preferredHost if it all standby constraints are met
+        checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, peekAllocatedResource(preferredHost));
         state.matchedResourceRequests.incrementAndGet();
       } else {
         log.info("Did not find any allocated resources on preferred host {} for running container id {}",
@@ -81,14 +87,21 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
 
         if (expired) {
           updateExpiryMetrics(request);
-          if (resourceAvailableOnAnyHost) {
+
+          if (standbyContainerManager.isPresent()) {
+            standbyContainerManager.get().handleExpiredResourceRequest(containerID, request,
+                Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)), this, resourceRequestState);
+
+          } else if (resourceAvailableOnAnyHost) {
             log.info("Request for container: {} on {} has expired. Running on ANY_HOST", request.getContainerID(), request.getPreferredHost());
             runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+
           } else {
             log.info("Request for container: {} on {} has expired. Requesting additional resources on ANY_HOST.", request.getContainerID(), request.getPreferredHost());
             resourceRequestState.cancelResourceRequest(request);
             requestResource(containerID, ResourceRequestState.ANY_HOST);
           }
+
         } else {
           log.info("Request for container: {} on {} has not yet expired. Request creation time: {}. Request timeout: {}",
               new Object[]{request.getContainerID(), request.getPreferredHost(), request.getRequestTimestampMs(), requestTimeout});
@@ -98,6 +111,7 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
     }
   }
 
+
   /**
    * Since host-affinity is enabled, all container processes will be requested on their preferred host. If the job is
    * run for the first time, it will get matched to any available host.
@@ -142,4 +156,15 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
       state.expiredPreferredHostRequests.incrementAndGet();
     }
   }
-}
+
+  private void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost, SamzaResource samzaResource) {
+    // If standby tasks are not enabled run streamprocessor on the given host
+    if (!this.standbyContainerManager.isPresent()) {
+      runStreamProcessor(request, preferredHost);
+      return;
+    }
+
+    this.standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost,
+        samzaResource, this, resourceRequestState);
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index aa7a509..0296d4c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -152,8 +152,8 @@ public class ResourceRequestState {
                * assigned to ANY_HOST
                */
               log.info("The number of containers already allocated on {} is greater than what was " +
-                              "requested, which is {}. Hence, saving the samzaResource {} in the buffer for ANY_HOST",
-                      new Object[]{hostName, requestCountOnThisHost, samzaResource.getResourceID()});
+                      "requested, which is {}. Hence, saving the samzaResource {} in the buffer for ANY_HOST",
+                  new Object[]{hostName, requestCountOnThisHost, samzaResource.getResourceID()});
               addToAllocatedResourceList(ANY_HOST, samzaResource);
             }
           }
@@ -238,9 +238,21 @@ public class ResourceRequestState {
    *
    * @param resource the {@link SamzaResource} to release.
    */
-  public void releaseUnstartableContainer(SamzaResource resource) {
-    log.info("Releasing unstartable container {}", resource.getResourceID());
-    manager.releaseResources(resource);
+  public void releaseUnstartableContainer(SamzaResource resource, String preferredHost) {
+    synchronized (lock) {
+      log.info("Releasing unstartable container {} on host {}", resource.getResourceID(), resource.getHost());
+      manager.releaseResources(resource);
+
+      // A reference for the resource could either be held in the preferred host buffer or in the ANY_HOST buffer.
+      if (allocatedResources.get(preferredHost) != null) {
+        allocatedResources.get(preferredHost).remove(resource);
+        log.info("Resource {} removed from allocated resource buffer for host {}", resource.getResourceID(), preferredHost);
+      }
+      if (allocatedResources.get(ANY_HOST) != null) {
+        allocatedResources.get(ANY_HOST).remove(resource);
+        log.info("Resource {} removed from allocated resource buffer for host {}", resource.getResourceID(), ANY_HOST);
+      }
+    }
   }
 
 
@@ -366,4 +378,4 @@ public class ResourceRequestState {
   }
 
 
-}
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 4e6fc33..2a7b6a1 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -78,7 +78,7 @@ public class SamzaApplicationState {
   public final AtomicInteger releasedContainers = new AtomicInteger(0);
 
   /**
-   * ContainerStatus of failed containers.
+   * ContainerStatuses of failed containers.
    */
   public final ConcurrentMap<String, SamzaResourceStatus> failedContainersStatus = new ConcurrentHashMap<String, SamzaResourceStatus>();
 
@@ -110,7 +110,7 @@ public class SamzaApplicationState {
    */
   public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);
 
-  /**
+   /**
    * Final status of the application. Made to be volatile s.t. changes will be visible in multiple threads.
    */
   public volatile SamzaAppStatus status = SamzaAppStatus.UNDEFINED;
@@ -141,6 +141,25 @@ public class SamzaApplicationState {
    */
   public final AtomicInteger redundantNotifications = new AtomicInteger(0);
 
+  /**
+   * Number of container allocations from the RM, that did not meet standby container constraints, in which case the
+   * existing resource was given back to the RM, and a new ANY-HOST request had to be made.
+   */
+  public final AtomicInteger failedStandbyAllocations = new AtomicInteger(0);
+
+  /**
+   * Number of occurrences in which a failover of an active container was initiated (due to a node failure), in which a
+   * running standby container was available for the failover.
+   * If two standby containers were used for one failing active, it counts as two.
+   */
+  public final AtomicInteger failoversToStandby = new AtomicInteger(0);
+
+  /**
+   * Number of occurrences in which a failover of an active container was initiated (due to a node failure), in which no
+   * running standby container was available for the failover.
+  */
+  public final AtomicInteger failoversToAnyHost = new AtomicInteger(0);
+
   public SamzaApplicationState(JobModelManager jobModelManager) {
     this.jobModelManager = jobModelManager;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 4159ff2..df70459 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -109,11 +109,21 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
   }
 
   /**
-   * Requests are ordered by the time at which they were created.
+   * Requests are ordered by the container type and the time at which they were created.
+   * Active containers take precedence over standby containers, regardless of timestamp.
    * @param o the other
    */
   @Override
   public int compareTo(SamzaResourceRequest o) {
+
+    if (!StandbyTaskUtil.isStandbyContainer(this.containerID) && StandbyTaskUtil.isStandbyContainer(o.containerID)) {
+      return -1;
+    }
+
+    if (StandbyTaskUtil.isStandbyContainer(this.containerID) && !StandbyTaskUtil.isStandbyContainer(o.containerID)) {
+      return 1;
+    }
+
     if (this.requestTimestampMs < o.requestTimestampMs)
       return -1;
     if (this.requestTimestampMs > o.requestTimestampMs)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
new file mode 100644
index 0000000..db27238
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.storage.kv.Entry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Encapsulates logic and state concerning standby-containers.
+ */
+public class StandbyContainerManager {
+
+  private static final Logger log = LoggerFactory.getLogger(StandbyContainerManager.class);
+
+  private final SamzaApplicationState samzaApplicationState;
+
+  // Map of samza containerIDs to their corresponding active and standby containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
+  // This is used for checking no two standbys or active-standby-pair are started on the same host
+  private final Map<String, List<String>> standbyContainerConstraints;
+
+  // Map of active containers that are in failover, indexed by the active container's resourceID (at the time of failure)
+  private final Map<String, FailoverMetadata> failovers;
+
+  // Resource-manager, used to stop containers
+  private ClusterResourceManager clusterResourceManager;
+
+  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
+      ClusterResourceManager clusterResourceManager) {
+    this.failovers = new ConcurrentHashMap<>();
+    this.standbyContainerConstraints = new HashMap<>();
+    this.samzaApplicationState = samzaApplicationState;
+    JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();
+
+    // populate the standbyContainerConstraints map by iterating over all containers
+    jobModel.getContainers()
+        .keySet()
+        .forEach(containerId -> standbyContainerConstraints.put(containerId,
+            StandbyTaskUtil.getStandbyContainerConstraints(containerId, jobModel)));
+    this.clusterResourceManager = clusterResourceManager;
+
+    log.info("Populated standbyContainerConstraints map {}", standbyContainerConstraints);
+  }
+
+  /**
+   * We handle the stopping of a container depending on the case which is decided using the exit-status:
+   *    Case 1. an Active-Container which has stopped for an "unknown" reason, then we start it on the given preferredHost (but we record the resource-request)
+   *    Case 2. Active container has stopped because of node failure, thene we initiate a failover
+   *    Case 3. StandbyContainer has stopped after it was chosen for failover, see {@link StandbyContainerManager#handleStandbyContainerStop}
+   *    Case 4. StandbyContainer has stopped but not because of a failover, see {@link StandbyContainerManager#handleStandbyContainerStop}
+   *
+   * @param containerID containerID of the stopped container
+   * @param resourceID last resourceID of the stopped container
+   * @param preferredHost the host on which the container was running
+   * @param exitStatus the exit status of the failed container
+   * @param containerAllocator the container allocator
+   */
+  public void handleContainerStop(String containerID, String resourceID, String preferredHost, int exitStatus,
+      AbstractContainerAllocator containerAllocator) {
+
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      handleStandbyContainerStop(containerID, resourceID, preferredHost, containerAllocator);
+    } else {
+      // initiate failover for the active container based on the exitStatus
+      switch (exitStatus) {
+        case SamzaResourceStatus.DISK_FAIL:
+        case SamzaResourceStatus.ABORTED:
+        case SamzaResourceStatus.PREEMPTED:
+          initiateActiveContainerFailover(containerID, resourceID, containerAllocator);
+          break;
+      // in all other cases, request-resource for the failed container, but record the resource-request, so that
+      // if this request expires, we can do a failover -- select a standby to stop & start the active on standby's host
+        default:
+          log.info("Requesting resource for active-container {} on host {}", containerID, preferredHost);
+          SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequest(containerID, preferredHost);
+          FailoverMetadata failoverMetadata = registerActiveContainerFailure(containerID, resourceID);
+          failoverMetadata.recordResourceRequest(resourceRequest);
+          containerAllocator.issueResourceRequest(resourceRequest);
+          break;
+      }
+    }
+  }
+
+  /**
+   * Handle the failed launch of a container, based on
+   *    Case 1. If it is an active container, then initiate a failover for it.
+   *    Case 2. If it is standby container, request a new resource on AnyHost.
+   * @param containerID the ID of the container that has failed
+   */
+  public void handleContainerLaunchFail(String containerID, String resourceID,
+      AbstractContainerAllocator containerAllocator) {
+
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID);
+      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+    } else {
+      initiateActiveContainerFailover(containerID, resourceID, containerAllocator);
+    }
+  }
+
+  /**
+   *  If a standby container has stopped, then there are two possible cases
+   *    Case 1. during a failover, the standby container was stopped for an active's start, then we
+   *       1. request a resource on the standby's host to place the activeContainer, and
+   *       2. request anyhost to place this standby
+   *
+   *    Case 2. independent of a failover, the standby container stopped, in which proceed with its resource-request
+   * @param standbyContainerID SamzaContainerID of the standby container
+   * @param preferredHost Preferred host of the standby container
+   */
+  private void handleStandbyContainerStop(String standbyContainerID, String resourceID, String preferredHost,
+      AbstractContainerAllocator containerAllocator) {
+
+    // if this standbyContainerResource was stopped for a failover, we will find a metadata entry
+    Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = this.checkIfUsedForFailover(resourceID);
+
+    if (failoverMetadata.isPresent()) {
+      String activeContainerID = failoverMetadata.get().activeContainerID;
+      String standbyContainerHostname = failoverMetadata.get().getStandbyContainerHostname(resourceID);
+
+      log.info("Requesting resource for active container {} on host {}, and backup container {} on any host",
+          activeContainerID, standbyContainerHostname, standbyContainerID);
+
+      // request standbycontainer's host for active-container
+      SamzaResourceRequest resourceRequestForActive = containerAllocator.getResourceRequest(activeContainerID, standbyContainerHostname);
+      // record the resource request, before issuing it to avoid race with allocation-thread
+      failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
+      containerAllocator.issueResourceRequest(resourceRequestForActive);
+
+      // request any-host for standby container
+      containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST);
+    } else {
+      log.info("Issuing request for standby container {} on host {}, since this is not for a failover",
+          standbyContainerID, preferredHost);
+      containerAllocator.requestResource(standbyContainerID, preferredHost);
+    }
+  }
+
+  /** Method to handle failover for an active container.
+   *  We try to find a standby for the active container, and issue a stop on it.
+   *  If we do not find a standby container, we simply issue an anyhost request to place it.
+   *
+   * @param containerID the samzaContainerID of the active-container
+   * @param resourceID  the samza-resource-ID of the container when it failed (used to index failover-state)
+   */
+  private void initiateActiveContainerFailover(String containerID, String resourceID,
+      AbstractContainerAllocator containerAllocator) {
+
+    Optional<Entry<String, SamzaResource>> standbyContainer = this.selectStandby(containerID, resourceID);
+
+    // If we find a standbyContainer, we initiate a failover
+    if (standbyContainer.isPresent()) {
+
+      String standbyContainerId = standbyContainer.get().getKey();
+      SamzaResource standbyResource = standbyContainer.get().getValue();
+      String standbyResourceID = standbyResource.getResourceID();
+      String standbyHost = standbyResource.getHost();
+
+      // update the state
+      FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(containerID, resourceID);
+      failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost);
+
+      log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, "
+          + "for active container {}", standbyContainerId, standbyResourceID, containerID);
+      samzaApplicationState.failoversToStandby.incrementAndGet();
+      this.clusterResourceManager.stopStreamProcessor(standbyResource);
+
+    } else {
+
+      // If we dont find a standbyContainer, we proceed with the ANYHOST request
+      log.info("No standby container found for active container {}, making a request for {}", containerID,
+          ResourceRequestState.ANY_HOST);
+      samzaApplicationState.failoversToAnyHost.incrementAndGet();
+      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+    }
+  }
+
+  /**
+   * Method to select a standby container for a given active container that has stopped.
+   * TODO: enrich this method to select standby's intelligently based on lag, timestamp, load-balencing, etc.
+   * @param activeContainerID Samza containerID of the active container
+   * @param activeContainerResourceID ResourceID of the active container at the time of its last failure
+   * @return
+   */
+  private Optional<Entry<String, SamzaResource>> selectStandby(String activeContainerID,
+      String activeContainerResourceID) {
+
+    log.info("Standby containers {} for active container {}", this.standbyContainerConstraints.get(activeContainerID), activeContainerID);
+
+    // obtain any existing failover metadata
+    Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata =
+        activeContainerResourceID == null ? Optional.empty() : this.getFailoverMetadata(activeContainerResourceID);
+
+    // Iterate over the list of running standby containers, to find a standby resource that we have not already
+    // used for a failover for this active resoruce
+    for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {
+
+      if (samzaApplicationState.runningContainers.containsKey(standbyContainerID)) {
+        SamzaResource standbyContainerResource = samzaApplicationState.runningContainers.get(standbyContainerID);
+
+        // use this standby if there was no previous failover for which this standbyResource was used
+        if (!(failoverMetadata.isPresent() && failoverMetadata.get().isStandbyResourceUsed(standbyContainerResource.getResourceID()))) {
+
+          log.info("Returning standby container {} in running state for active container {}", standbyContainerID,
+              activeContainerID);
+          return Optional.of(new Entry<>(standbyContainerID, standbyContainerResource));
+        }
+      }
+    }
+
+    log.info("Did not find any running standby container for active container {}", activeContainerID);
+    return Optional.empty();
+  }
+
+  /**
+   * Register the failure of an active container (identified by its resource ID).
+   */
+  private FailoverMetadata registerActiveContainerFailure(String activeContainerID, String activeContainerResourceID) {
+
+    // this active container's resource ID is already registered, in which case update the metadata
+    FailoverMetadata failoverMetadata;
+    if (failovers.containsKey(activeContainerResourceID)) {
+      failoverMetadata = failovers.get(activeContainerResourceID);
+    } else {
+      failoverMetadata = new FailoverMetadata(activeContainerID, activeContainerResourceID);
+    }
+    this.failovers.put(activeContainerResourceID, failoverMetadata);
+    return failoverMetadata;
+  }
+
+  /**
+   * Check if this standbyContainerResource is present in the failoverState for an active container.
+   * This is used to determine if we requested a stop a container.
+   */
+  private Optional<FailoverMetadata> checkIfUsedForFailover(String standbyContainerResourceId) {
+
+    if (standbyContainerResourceId == null) {
+      return Optional.empty();
+    }
+
+    for (FailoverMetadata failoverMetadata : failovers.values()) {
+      if (failoverMetadata.isStandbyResourceUsed(standbyContainerResourceId)) {
+        log.info("Standby container with resource id {} was selected for failover of active container {}",
+            standbyContainerResourceId, failoverMetadata.activeContainerID);
+        return Optional.of(failoverMetadata);
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   *
+   * @param request The resource request to match.
+   * @param samzaResource The samzaResource to potentially match the resource to.
+   * @return
+   */
+  private boolean checkStandbyConstraints(SamzaResourceRequest request, SamzaResource samzaResource) {
+    String containerIDToStart = request.getContainerID();
+    String host = samzaResource.getHost();
+    List<String> containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIDToStart);
+
+    // Check if any of these conflicting containers are running/launching on host
+    for (String containerID : containerIDsForStandbyConstraints) {
+      SamzaResource resource = samzaApplicationState.pendingContainers.get(containerID);
+
+      // return false if a conflicting container is pending for launch on the host
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+
+      // return false if a conflicting container is running on the host
+      resource = samzaApplicationState.runningContainers.get(containerID);
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   *  Attempt to the run a container on the given candidate resource, if doing so meets the standby container constraints.
+   * @param request The Samza container request
+   * @param preferredHost the preferred host associated with the container
+   * @param samzaResource the resource candidate
+   */
+  public void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost,
+      SamzaResource samzaResource, AbstractContainerAllocator containerAllocator,
+      ResourceRequestState resourceRequestState) {
+    String containerID = request.getContainerID();
+
+    if (checkStandbyConstraints(request, samzaResource)) {
+      // This resource can be used to launch this container
+      log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID, samzaResource.getHost(), preferredHost);
+      containerAllocator.runStreamProcessor(request, preferredHost);
+    } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      // This resource cannot be used to launch this standby container, so we make a new anyhost request
+      log.info("Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request",
+          containerID, samzaResource.getHost());
+      resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost);
+      resourceRequestState.cancelResourceRequest(request);
+      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      samzaApplicationState.failedStandbyAllocations.incrementAndGet();
+    } else {
+      // This resource cannot be used to launch this active container container, so we initiate a failover
+      log.warn("Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource",
+          containerID, samzaResource.getHost());
+      resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost);
+      resourceRequestState.cancelResourceRequest(request);
+
+      Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
+
+      // if this active-container has never failed, then simple request anyhost
+      if (!failoverMetadata.isPresent()) {
+        log.info("Requesting ANY_HOST for active container {}", containerID);
+        containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      } else {
+        log.info("Initiating failover for active container {}", containerID);
+        // we use the activeContainer's last resourceID to initiate the failover
+        String lastKnownResourceID = failoverMetadata.get().activeContainerResourceID;
+        initiateActiveContainerFailover(containerID, lastKnownResourceID, containerAllocator);
+      }
+
+      samzaApplicationState.failedStandbyAllocations.incrementAndGet();
+    }
+  }
+
+  /**
+   * Handle an expired resource request
+   * @param containerID the containerID for which the resource request was made
+   * @param request the expired resource request
+   * @param alternativeResource an alternative, already-allocated, resource (if available)
+   * @param containerAllocator the container allocator (used to issue any required subsequent resource requests)
+   * @param resourceRequestState used to cancel resource requests if required.
+   */
+  public void handleExpiredResourceRequest(String containerID, SamzaResourceRequest request,
+      Optional<SamzaResource> alternativeResource, AbstractContainerAllocator containerAllocator,
+      ResourceRequestState resourceRequestState) {
+
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      handleExpiredRequestForStandbyContainer(containerID, request, alternativeResource, containerAllocator, resourceRequestState);
+    } else {
+      handleExpiredRequestForActiveContainer(containerID, request, alternativeResource, containerAllocator, resourceRequestState);
+    }
+  }
+
+  // Handle an expired resource request that was made for placing a standby container
+  private void handleExpiredRequestForStandbyContainer(String containerID, SamzaResourceRequest request,
+      Optional<SamzaResource> alternativeResource, AbstractContainerAllocator containerAllocator,
+      ResourceRequestState resourceRequestState) {
+
+    if (alternativeResource.isPresent()) {
+      // A standby container can be started on the anyhost-alternative-resource rightaway provided it passes all the
+      // standby constraints
+      log.info("Handling expired request, standby container {} can be started on alternative resource {}", containerID, alternativeResource.get());
+      checkStandbyConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, alternativeResource.get(),
+          containerAllocator, resourceRequestState);
+
+    } else {
+      // If there is no alternative-resource for the standby container we make a new anyhost request
+      log.info("Handling expired request, requesting anyHost resource for standby container {}", containerID);
+      resourceRequestState.cancelResourceRequest(request);
+      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+    }
+  }
+
+  // Handle an expired resource request that was made for placing an active container
+  private void handleExpiredRequestForActiveContainer(String containerID, SamzaResourceRequest request,
+      Optional<SamzaResource> alternativeResource, AbstractContainerAllocator containerAllocator,
+      ResourceRequestState resourceRequestState) {
+
+    Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
+
+    // An active container can be started on the alternative-any-host resource rightaway, if it has no prior failure,
+    // that is, there is no failoverMetadata associated with this resource-request
+    if (alternativeResource.isPresent() && !failoverMetadata.isPresent()) {
+
+      log.info("Handling expired request, trying to run active container {} on alternative resource {}", containerID, alternativeResource.get());
+
+      checkStandbyConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, alternativeResource.get(),
+          containerAllocator, resourceRequestState);
+
+    } else if (!failoverMetadata.isPresent() && !alternativeResource.isPresent()) {
+    // An active container has no prior failure, and there is no-alternative-anyhost resource, so we make a new anyhost request
+      log.info("Handling expired request, requesting anyHost resource for active container {} because this active container has never failed", containerID);
+
+      resourceRequestState.cancelResourceRequest(request);
+      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+
+    } else if (failoverMetadata.isPresent()) {
+      // An active container that had failed, and whose subsequent resource request has expired, needs to be failed over to
+      // a new standby-candidate, so we initiate a failover
+
+      log.info("Handling expired request, initiating failover for active container {}", containerID);
+
+      resourceRequestState.cancelResourceRequest(request);
+
+      // we use the activeContainer's resourceID to initiate the failover
+      String lastKnownResourceID = failoverMetadata.get().activeContainerResourceID;
+      initiateActiveContainerFailover(containerID, lastKnownResourceID, containerAllocator);
+
+    } else {
+      log.error("Handling expired request, invalid state containerID {}, resource request {}", containerID, request);
+    }
+  }
+
+
+  /**
+   * Check if a activeContainerResource has failover-metadata associated with it
+   */
+  private Optional<FailoverMetadata> getFailoverMetadata(String activeContainerResourceID) {
+    return this.failovers.containsKey(activeContainerResourceID) ? Optional.of(
+        this.failovers.get(activeContainerResourceID)) : Optional.empty();
+  }
+
+  /**
+   * Check if a SamzaResourceRequest was issued for a failover.
+   */
+  private Optional<FailoverMetadata> getFailoverMetadata(SamzaResourceRequest resourceRequest) {
+    for (FailoverMetadata failoverMetadata : this.failovers.values()) {
+      if (failoverMetadata.containsResourceRequest(resourceRequest)) {
+        return Optional.of(failoverMetadata);
+      }
+    }
+    return Optional.empty();
+  }
+
+
+  @Override
+  public String toString() {
+    return this.failovers.toString();
+  }
+
+  /**
+   * Encapsulates metadata concerning the failover of an active container.
+   */
+  public class FailoverMetadata {
+    public final String activeContainerID;
+    public final String activeContainerResourceID;
+
+    // Map of samza-container-resource ID to host, for each standby container selected for failover of the activeContainer
+    private final Map<String, String> selectedStandbyContainers;
+
+    // Resource requests issued during this failover
+    private final Set<SamzaResourceRequest> resourceRequests;
+
+    public FailoverMetadata(String activeContainerID, String activeContainerResourceID) {
+      this.activeContainerID = activeContainerID;
+      this.activeContainerResourceID = activeContainerResourceID;
+      this.selectedStandbyContainers = new HashMap<>();
+      resourceRequests = new HashSet<>();
+    }
+
+    // Check if this standbyContainerResourceID was used in this failover
+    public synchronized boolean isStandbyResourceUsed(String standbyContainerResourceID) {
+      return this.selectedStandbyContainers.keySet().contains(standbyContainerResourceID);
+    }
+
+    // Get the hostname corresponding to the standby resourceID
+    public synchronized String getStandbyContainerHostname(String standbyContainerResourceID) {
+      return selectedStandbyContainers.get(standbyContainerResourceID);
+    }
+
+    // Add the standbyContainer resource to the list of standbyContainers used in this failover
+    public synchronized void updateStandbyContainer(String standbyContainerResourceID, String standbyContainerHost) {
+      this.selectedStandbyContainers.put(standbyContainerResourceID, standbyContainerHost);
+    }
+
+    // Add the samzaResourceRequest to the list of resource requests associated with this failover
+    public synchronized void recordResourceRequest(SamzaResourceRequest samzaResourceRequest) {
+      this.resourceRequests.add(samzaResourceRequest);
+    }
+
+    // Check if this resource request is used for this failover
+    public synchronized boolean containsResourceRequest(SamzaResourceRequest samzaResourceRequest) {
+      return this.resourceRequests.contains(samzaResourceRequest);
+    }
+
+    @Override
+    public String toString() {
+      return "[activeContainerID: " + this.activeContainerID + " activeContainerResourceID: "
+          + this.activeContainerResourceID + " selectedStandbyContainers:" + selectedStandbyContainers
+          + " resourceRequests: " + resourceRequests + "]";
+    }
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyTaskUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyTaskUtil.java
new file mode 100644
index 0000000..c993445
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyTaskUtil.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+
+
+/**
+ * Collection of util methods used for performing Standby-aware Container allocation in YARN.
+ */
+public class StandbyTaskUtil {
+  private static final String STANDBY_CONTAINER_ID_SEPARATOR = "-";
+  private static final String TASKNAME_SEPARATOR = "-";
+  private static final String STANDBY_TASKNAME_PREFIX = "Standby";
+
+  /**
+   * Returns true if the containerName implies a standby container, false otherwise.
+   * @param containerID The desired containerID
+   * @return
+   */
+  public static boolean isStandbyContainer(String containerID) {
+    return containerID.contains(STANDBY_CONTAINER_ID_SEPARATOR);
+  }
+
+  // Helper method to generate buddy containerIDs by appending the replica-number to the active-container's id.
+  public final static String getStandbyContainerId(String activeContainerId, int replicaNumber) {
+    return activeContainerId.concat(STANDBY_CONTAINER_ID_SEPARATOR).concat(String.valueOf(replicaNumber));
+  }
+
+  // Helper method to generate active container's ID by removing the replica-number from the standby container's id.
+  public final static String getActiveContainerId(String standbyContainerID) {
+    return standbyContainerID.split(STANDBY_CONTAINER_ID_SEPARATOR)[0];
+  }
+
+  // Helper method to get the standby task name by prefixing "Standby" to the corresponding active task's name.
+  public final static TaskName getStandbyTaskName(TaskName activeTaskName, int replicaNum) {
+    return new TaskName(STANDBY_TASKNAME_PREFIX.concat(TASKNAME_SEPARATOR)
+        .concat(activeTaskName.getTaskName())
+        .concat(TASKNAME_SEPARATOR)
+        .concat(String.valueOf(replicaNum)));
+  }
+
+  // Helper method to get the active task name by stripping the prefix "Standby" from the standby task name.
+  public final static TaskName getActiveTaskName(TaskName standbyTaskName) {
+    return new TaskName(standbyTaskName.getTaskName().split(TASKNAME_SEPARATOR)[1]);
+  }
+
+  /**
+   *  Given a containerID and job model, it returns the containerids of all containers that either have
+   *  a. standby tasks corresponding to active tasks on the given container, or
+   *  b. have active tasks corresponding to standby tasks on the given container.
+   *  This is used to ensure that an active task and all its corresponding standby tasks are on separate hosts, and
+   *  standby tasks corresponding to the same active task are on separate hosts.
+   */
+  public static List<String> getStandbyContainerConstraints(String containerID, JobModel jobModel) {
+
+    ContainerModel givenContainerModel = jobModel.getContainers().get(containerID);
+    List<String> containerIDsWithStandbyConstraints = new ArrayList<>();
+
+    // iterate over all containerModels in the jobModel
+    for (ContainerModel containerModel : jobModel.getContainers().values()) {
+
+      // add to list if active and standby tasks on the two containerModels overlap
+      if (!givenContainerModel.equals(containerModel) && checkTaskOverlap(givenContainerModel, containerModel)) {
+        containerIDsWithStandbyConstraints.add(containerModel.getId());
+      }
+    }
+    return containerIDsWithStandbyConstraints;
+  }
+
+  // Helper method that checks if tasks on the two containerModels overlap
+  private static boolean checkTaskOverlap(ContainerModel containerModel1, ContainerModel containerModel2) {
+    Set<TaskName> activeTasksOnContainer1 = getCorrespondingActiveTasks(containerModel1);
+    Set<TaskName> activeTasksOnContainer2 = getCorrespondingActiveTasks(containerModel2);
+    return !Collections.disjoint(activeTasksOnContainer1, activeTasksOnContainer2);
+  }
+
+  // Helper method that returns the active tasks corresponding to all standby tasks on a container, including any already-active tasks on the container
+  private static Set<TaskName> getCorrespondingActiveTasks(ContainerModel containerModel) {
+    Set<TaskName> tasksInActiveMode = getAllTasks(containerModel, TaskMode.Active);
+    tasksInActiveMode.addAll(getAllTasks(containerModel, TaskMode.Standby).stream()
+        .map(taskName -> getActiveTaskName(taskName))
+        .collect(Collectors.toSet()));
+    return tasksInActiveMode;
+  }
+
+  // Helper method to getAllTaskModels of this container in the given taskMode
+  private static Set<TaskName> getAllTasks(ContainerModel containerModel, TaskMode taskMode) {
+    return containerModel.getTasks()
+        .values()
+        .stream()
+        .filter(e -> e.getTaskMode().equals(taskMode))
+        .map(taskModel -> taskModel.getTaskName())
+        .collect(Collectors.toSet());
+  }
+
+}
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
index c7d556a..5fd9ceb 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.samza.clustermanager.StandbyTaskUtil;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskMode;
@@ -60,9 +61,6 @@ import org.slf4j.LoggerFactory;
 public class TaskNameGrouperProxy {
 
   private static final Logger LOG = LoggerFactory.getLogger(TaskNameGrouperProxy.class);
-  private static final String CONTAINER_ID_SEPARATOR = "-";
-  private static final String TASKNAME_SEPARATOR = "-";
-  private static final String STANDBY_TASKNAME_PREFIX = "Standby";
   private final TaskNameGrouper taskNameGrouper;
   private final boolean standbyTasksEnabled;
   private final int replicationFactor;
@@ -104,7 +102,7 @@ public class TaskNameGrouperProxy {
 
     for (ContainerModel activeContainer : containerModels) {
       for (int replicaNum = 0; replicaNum < replicationFactor - 1; replicaNum++) {
-        String buddyContainerId = getBuddyContainerId(activeContainer.getId(), replicaNum);
+        String buddyContainerId = StandbyTaskUtil.getStandbyContainerId(activeContainer.getId(), replicaNum);
 
         ContainerModel buddyContainerModel =
             new ContainerModel(buddyContainerId, getTaskModelForBuddyContainer(activeContainer.getTasks(), replicaNum));
@@ -124,7 +122,7 @@ public class TaskNameGrouperProxy {
     Map<TaskName, TaskModel> standbyTaskModels = new HashMap<>();
 
     for (TaskName taskName : activeContainerTaskModel.keySet()) {
-      TaskName standbyTaskName = getStandbyTaskName(taskName, replicaNum);
+      TaskName standbyTaskName = StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);
       TaskModel standbyTaskModel =
           new TaskModel(standbyTaskName, activeContainerTaskModel.get(taskName).getSystemStreamPartitions(),
               activeContainerTaskModel.get(taskName).getChangelogPartition(), TaskMode.Standby);
@@ -135,17 +133,4 @@ public class TaskNameGrouperProxy {
         activeContainerTaskModel);
     return standbyTaskModels;
   }
-
-  // Helper method to generate buddy containerIDs by appending the replica-number to the active-container's id.
-  private final static String getBuddyContainerId(String activeContainerId, int replicaNumber) {
-    return activeContainerId.concat(CONTAINER_ID_SEPARATOR).concat(String.valueOf(replicaNumber));
-  }
-
-  // Helper method to get the standby task name by prefixing "Standby" to the corresponding active task's name.
-  private final static TaskName getStandbyTaskName(TaskName activeTaskName, int replicaNum) {
-    return new TaskName(STANDBY_TASKNAME_PREFIX.concat(TASKNAME_SEPARATOR)
-        .concat(activeTaskName.getTaskName())
-        .concat(TASKNAME_SEPARATOR)
-        .concat(String.valueOf(replicaNum)));
-  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 42f7f4b..e662fa1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import java.util.stream.Collectors;
+import org.apache.samza.clustermanager.StandbyTaskUtil;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
@@ -204,6 +205,7 @@ public class StorageManagerUtil {
 
   /**
    * Creates and returns a File pointing to the directory for the given store and task, given a particular base directory.
+   * In case of a standby task (TaskMode.Standby), the storeDirectory is the same as it would be for an active task.
    *
    * @param storeBaseDir the base directory to use
    * @param storeName the store name to use
@@ -212,7 +214,10 @@ public class StorageManagerUtil {
    * @return the partition directory for the store
    */
   public static File getStorePartitionDir(File storeBaseDir, String storeName, TaskName taskName, TaskMode taskMode) {
-    // TODO: use task-Mode to decide the storePartitionDir -- standby's dir should be the same as active
-    return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_'));
+    TaskName taskNameForDirName = taskName;
+    if (taskMode.equals(TaskMode.Standby)) {
+      taskNameForDirName =  StandbyTaskUtil.getActiveTaskName(taskName);
+    }
+    return new File(storeBaseDir, (storeName + File.separator + taskNameForDirName.toString()).replace(' ', '_'));
   }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 15cb18f..265e56f 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -69,6 +69,10 @@ class ContainerProcessManagerMetrics(
           }
       })
 
+     val mFailedStandbyAllocations = newGauge("failed-standby-allocations", () => state.failedStandbyAllocations.get())
+     val mFailoversToAnyHost = newGauge("failovers-to-any-host", () => state.failoversToAnyHost.get())
+     val mFailoversToStandby = newGauge("failovers-to-standby", () => state.failoversToStandby.get())
+
     jvm.start
     reporters.values.foreach(_.start)
   }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index 471c7fe..4545a75 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -93,6 +93,11 @@ public class MockClusterResourceManager extends ClusterResourceManager {
     launchCountSemaphore.release();
   }
 
+  @Override
+  public void stopStreamProcessor(SamzaResource resource) {
+    // no op
+  }
+
   public void registerContainerListener(MockContainerListener listener) {
     mockContainerListeners.add(listener);
   }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
index 3aa58b2..676018b 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
@@ -70,8 +70,8 @@ public class MockContainerRequestState extends ResourceRequestState {
   }
 
   @Override
-  public void releaseUnstartableContainer(SamzaResource container) {
-    super.releaseUnstartableContainer(container);
+  public void releaseUnstartableContainer(SamzaResource container, String preferredHost) {
+    super.releaseUnstartableContainer(container, preferredHost);
 
     numReleasedContainers += 1;
     for (MockContainerListener listener : mockContainerListeners) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
index ea05fff..1285c5f 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.Optional;
 import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
@@ -31,7 +32,7 @@ public class MockHostAwareContainerAllocator extends HostAwareContainerAllocator
 
   public MockHostAwareContainerAllocator(ClusterResourceManager manager,
       Config config, SamzaApplicationState state) {
-    super(manager, ALLOCATOR_TIMEOUT_MS, config, state);
+    super(manager, ALLOCATOR_TIMEOUT_MS, config, Optional.empty(), state);
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index d648a80..841e5ba 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -418,7 +418,7 @@ public class TestContainerProcessManager {
   public void testAllBufferedResourcesAreUtilized() throws Exception {
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfigWithHostAffinity());
-    config.put("cluster-manager.container.count", "2");
+    config.put("job.container.count", "2");
     Config cfg = new MapConfig(config);
     // 1. Request two containers on hosts - host1 and host2
     state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1",
@@ -475,6 +475,7 @@ public class TestContainerProcessManager {
     if (!allocator.awaitContainersStart(2, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
+    taskManager.onStreamProcessorLaunchSuccess(resource2);
     taskManager.onStreamProcessorLaunchSuccess(resource3);
 
     assertTrue(state.jobHealthy.get());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 3e5e785..fd3b452 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -21,6 +21,7 @@ package org.apache.samza.clustermanager;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -73,7 +74,7 @@ public class TestHostAwareContainerAllocator {
 
   @Before
   public void setup() throws Exception {
-    containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, timeoutMillis, config, state);
+    containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, timeoutMillis, config, Optional.empty(), state);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
     Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
new file mode 100644
index 0000000..c075f14
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestStandbyAllocator {
+
+  @Test
+  public void testWithNoStandby() {
+    JobModel jobModel = getJobModelWithStandby(1, 1, 1);
+    List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
+    Assert.assertEquals("Constrained container count should be 0", 0, containerConstraints.size());
+  }
+
+  @Test
+  public void testWithStandby() {
+    testWithStandby(2, 1, 2);
+    testWithStandby(10, 1, 2);
+
+    testWithStandby(2, 10, 2);
+    testWithStandby(2, 10, 4);
+
+    testWithStandby(10, 1, 4);
+    testWithStandby(10, 10, 4);
+  }
+
+
+  public void testWithStandby(int nContainers, int nTasks, int replicationFactor) {
+    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, replicationFactor);
+
+    for (String containerID : jobModel.getContainers().keySet()) {
+      List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);
+
+      Assert.assertTrue("Constrained container should be valid containers",
+          jobModel.getContainers().keySet().containsAll(containerConstraints));
+
+      Assert.assertEquals("Constrained container count should be (replication factor-1)", replicationFactor - 1,
+          containerConstraints.size());
+
+      Assert.assertFalse("Constrained containers list should not have the container itself",
+          containerConstraints.contains(containerID));
+
+      containerConstraints.forEach(containerConstraintID -> {
+          Assert.assertTrue("Constrained containers IDs should correspond to the active container",
+              containerID.split("-")[0].equals(containerConstraintID.split("-")[0]));
+        });
+    }
+  }
+
+  // Helper method to create a jobmodel with given number of containers, tasks and replication factor
+  private JobModel getJobModelWithStandby(int nContainers, int nTasks, int replicationFactor) {
+    Map<String, ContainerModel> containerModels = new HashMap<>();
+    int taskID = 0;
+
+    for (int j = 0; j < nContainers; j++) {
+      Map<TaskName, TaskModel> tasks = new HashMap<>();
+      for (int i = 0; i < nTasks; i++) {
+        TaskModel taskModel = getTaskModel(taskID++);
+        tasks.put(taskModel.getTaskName(), taskModel);
+      }
+      containerModels.put(String.valueOf(j), new ContainerModel(String.valueOf(j), tasks));
+    }
+
+    Map<String, ContainerModel> standbyContainerModels = new HashMap<>();
+    for (int i = 0; i < replicationFactor - 1; i++) {
+      for (String containerID : containerModels.keySet()) {
+        String standbyContainerId = StandbyTaskUtil.getStandbyContainerId(containerID, i);
+        Map<TaskName, TaskModel> standbyTasks = getStandbyTasks(containerModels.get(containerID).getTasks(), i);
+        standbyContainerModels.put(standbyContainerId, new ContainerModel(standbyContainerId, standbyTasks));
+      }
+    }
+
+    containerModels.putAll(standbyContainerModels);
+    return new JobModel(new MapConfig(), containerModels);
+  }
+
+  // Helper method that creates a taskmodel with one input ssp
+  private static TaskModel getTaskModel(int partitionNum) {
+    return new TaskModel(new TaskName("Partition " + partitionNum),
+        Collections.singleton(new SystemStreamPartition("test-system", "test-stream", new Partition(partitionNum))),
+        new Partition(partitionNum), TaskMode.Active);
+  }
+
+  // Helper method to create standby-taskModels from active-taskModels
+  private Map<TaskName, TaskModel> getStandbyTasks(Map<TaskName, TaskModel> tasks, int replicaNum) {
+    Map<TaskName, TaskModel> standbyTasks = new HashMap<>();
+    tasks.forEach((taskName, taskModel) -> {
+        TaskName standbyTaskName = StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);
+        standbyTasks.put(standbyTaskName,
+            new TaskModel(standbyTaskName, taskModel.getSystemStreamPartitions(), taskModel.getChangelogPartition(),
+                TaskMode.Standby));
+      });
+    return standbyTasks;
+  }
+}
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 53b61d9..6d0fdb1 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -227,7 +227,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
    */
   @Override
   public void requestResources(SamzaResourceRequest resourceRequest) {
-    log.info("Requesting resources on  " + resourceRequest.getPreferredHost() + " for container " + resourceRequest.getContainerID());
+    log.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + resourceRequest.getContainerID());
 
     int memoryMb = resourceRequest.getMemoryMB();
     int cpuCores = resourceRequest.getNumCores();
@@ -314,6 +314,14 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     }
   }
 
+  public void stopStreamProcessor(SamzaResource resource) {
+    synchronized (lock) {
+      log.info("Stopping resource {}", resource);
+      this.nmClientAsync.stopContainerAsync(allocatedResources.get(resource).getId(),
+          allocatedResources.get(resource).getNodeId());
+    }
+  }
+
   /**
    * Given a lookupContainerId from Yarn (for example: containerId_app_12345, this method returns the SamzaContainer ID
    * in the range [0,N-1] that maps to it.
@@ -520,7 +528,8 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
 
   @Override
   public void onContainerStopped(ContainerId containerId) {
-    log.info("Got a notification from the NodeManager for a stopped container. ContainerId: {}", containerId);
+    log.info("Got a notification from the NodeManager for a stopped container. ContainerId: {} samzaContainerId {}",
+        containerId, getIDForContainer(containerId.toString()));
   }
 
   @Override
@@ -549,6 +558,19 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   @Override
   public void onStopContainerError(ContainerId containerId, Throwable t) {
     log.info("Got an error when stopping container from the NodeManager. ContainerId: {}. Error: {}", containerId, t);
+    String samzaContainerId = getIDForContainer(containerId.toString());
+
+    if (samzaContainerId != null) {
+      YarnContainer container = state.runningYarnContainers.get(samzaContainerId);
+      log.info("Failed Stop on Yarn Container: {} had Samza ContainerId: {} ", containerId.toString(), samzaContainerId);
+      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
+          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
+
+      log.info("Re-invoking stop stream processor for container: {}", containerId);
+      this.stopStreamProcessor(resource);// For now, we retry the stopping of the container
+    } else {
+      log.info("Got an invalid notification for container: {}", containerId.toString());
+    }
   }
 
   /**