You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/08/09 17:50:55 UTC

[09/23] samza git commit: SAMZA-1359; Handle phantom container notifications cleanly during an RM fail-over

SAMZA-1359; Handle phantom container notifications cleanly during an RM fail-over

1. Improved our container handling logic to be resilient to phantom notifications.
2. Added a new metric to Samza's ContainerProcessManager module that tracks the number of such invalid notifications.
3. Add a couple of tests that simulate this exact scenario above that we encountered during the cluster upgrade. (container starts -> container fails -> legitimate notification for the failure - container re-start -> RM fail-over -> phantom notification with a different exit code)
4. As an aside, there are a whole bunch of tests in ContainerProcessManager that rely on Thread.sleep to ensure that threads get to run in a certain order. Removed this non-determinism and made them predictable.

Author: Jagadish Venkatraman <jv...@jvenkatr-mn2.linkedin.biz>

Reviewers: Jake Maes <jm...@linkedin.com>

Closes #243 from vjagadish1989/am-bug


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/35143b67
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/35143b67
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/35143b67

Branch: refs/heads/0.14.0
Commit: 35143b676f23f69324b442fd1061318a663538f0
Parents: 91b22fd
Author: Jagadish Venkatraman <jv...@jvenkatr-mn2.linkedin.biz>
Authored: Tue Jul 25 11:18:14 2017 -0700
Committer: Jagadish <ja...@apache.org>
Committed: Tue Jul 25 11:18:14 2017 -0700

----------------------------------------------------------------------
 .../clustermanager/ContainerProcessManager.java | 145 +++++++------
 .../clustermanager/SamzaApplicationState.java   |   8 +
 .../ContainerProcessManagerMetrics.scala        |   1 +
 .../clustermanager/MockContainerAllocator.java  |  24 +++
 .../TestContainerProcessManager.java            | 207 ++++++++++++++++---
 5 files changed, 283 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 9b5e871..2861e9e 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -239,9 +239,10 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     }
     if (containerId == null) {
       log.info("No matching container id found for " + containerStatus.toString());
-    } else {
-      state.runningContainers.remove(containerId);
+      state.redundantNotifications.incrementAndGet();
+      return;
     }
+    state.runningContainers.remove(containerId);
 
     int exitStatus = containerStatus.getExitCode();
     switch (exitStatus) {
@@ -250,10 +251,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
         state.completedContainers.incrementAndGet();
 
-        if (containerId != null) {
-          state.finishedContainers.incrementAndGet();
-          containerFailures.remove(containerId);
-        }
+        state.finishedContainers.incrementAndGet();
+        containerFailures.remove(containerId);
 
         if (state.completedContainers.get() == state.containerCount.get()) {
           log.info("Setting job status to SUCCEEDED, since all containers have been marked as completed.");
@@ -273,18 +272,16 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         state.releasedContainers.incrementAndGet();
 
         // If this container was assigned some partitions (a containerId), then
-        // clean up, and request a refactor container for the tasks. This only
+        // clean up, and request a new container for the tasks. This only
         // should happen if the container was 'lost' due to node failure, not
         // if the AM released the container.
-        if (containerId != null) {
-          log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId);
+        log.info("Released container {} was assigned task group ID {}. Requesting a new container for the task group.", containerIdStr, containerId);
 
-          state.neededContainers.incrementAndGet();
-          state.jobHealthy.set(false);
+        state.neededContainers.incrementAndGet();
+        state.jobHealthy.set(false);
 
-          // request a container on refactor host
-          containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
-        }
+          // request a container on new host
+        containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
         break;
 
       default:
@@ -296,72 +293,70 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         state.failedContainersStatus.put(containerIdStr, containerStatus);
         state.jobHealthy.set(false);
 
-        if (containerId != null) {
-          state.neededContainers.incrementAndGet();
-          // Find out previously running container location
-          String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
-          if (!hostAffinityEnabled || lastSeenOn == null) {
-            lastSeenOn = ResourceRequestState.ANY_HOST;
+        state.neededContainers.incrementAndGet();
+        // Find out previously running container location
+        String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
+        if (!hostAffinityEnabled || lastSeenOn == null) {
+          lastSeenOn = ResourceRequestState.ANY_HOST;
+        }
+        log.info("Container was last seen on " + lastSeenOn);
+        // A container failed for an unknown reason. Let's check to see if
+        // we need to shutdown the whole app master if too many container
+        // failures have happened. The rules for failing are that the
+        // failure count for a task group id must be > the configured retry
+        // count, and the last failure (the one prior to this one) must have
+        // happened less than retry window ms ago. If retry count is set to
+        // 0, the app master will fail on any container failure. If the
+        // retry count is set to a number < 0, a container failure will
+        // never trigger an app master failure.
+        int retryCount = clusterManagerConfig.getContainerRetryCount();
+        int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
+
+        if (retryCount == 0) {
+          log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr);
+
+          tooManyFailedContainers = true;
+        } else if (retryCount > 0) {
+          int currentFailCount;
+          long lastFailureTime;
+          if (containerFailures.containsKey(containerId)) {
+            ResourceFailure failure = containerFailures.get(containerId);
+            currentFailCount = failure.getCount() + 1;
+            lastFailureTime = failure.getLastFailure();
+          } else {
+            currentFailCount = 1;
+            lastFailureTime = 0L;
           }
-          log.info("Container was last seen on " + lastSeenOn);
-          // A container failed for an unknown reason. Let's check to see if
-          // we need to shutdown the whole app master if too many container
-          // failures have happened. The rules for failing are that the
-          // failure count for a task group id must be > the configured retry
-          // count, and the last failure (the one prior to this one) must have
-          // happened less than retry window ms ago. If retry count is set to
-          // 0, the app master will fail on any container failure. If the
-          // retry count is set to a number < 0, a container failure will
-          // never trigger an app master failure.
-          int retryCount = clusterManagerConfig.getContainerRetryCount();
-          int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
-
-          if (retryCount == 0) {
-            log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr);
-
-            tooManyFailedContainers = true;
-          } else if (retryCount > 0) {
-            int currentFailCount;
-            long lastFailureTime;
-            if (containerFailures.containsKey(containerId)) {
-              ResourceFailure failure = containerFailures.get(containerId);
-              currentFailCount = failure.getCount() + 1;
-              lastFailureTime = failure.getLastFailure();
+          if (currentFailCount >= retryCount) {
+            long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime;
+
+            if (lastFailureMsDiff < retryWindowMs) {
+              log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount +
+                      " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " +
+                      retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed.");
+
+              // We have too many failures, and we're within the window
+              // boundary, so reset shut down the app master.
+              tooManyFailedContainers = true;
+              state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
             } else {
-              currentFailCount = 1;
-              lastFailureTime = 0L;
-            }
-            if (currentFailCount >= retryCount) {
-              long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime;
-
-              if (lastFailureMsDiff < retryWindowMs) {
-                log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount +
-                        " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " +
-                        retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed.");
-
-                // We have too many failures, and we're within the window
-                // boundary, so reset shut down the app master.
-                tooManyFailedContainers = true;
-                state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
-              } else {
-                log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " +
-                        "this container ID was outside the bounds of the retry window.", containerId, containerIdStr);
-
-                // Reset counter back to 1, since the last failure for this
-                // container happened outside the window boundary.
-                containerFailures.put(containerId, new ResourceFailure(1, System.currentTimeMillis()));
-              }
-            } else {
-              log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount);
-              containerFailures.put(containerId, new ResourceFailure(currentFailCount, System.currentTimeMillis()));
+              log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " +
+                      "this container ID was outside the bounds of the retry window.", containerId, containerIdStr);
+
+              // Reset counter back to 1, since the last failure for this
+              // container happened outside the window boundary.
+              containerFailures.put(containerId, new ResourceFailure(1, System.currentTimeMillis()));
             }
+          } else {
+            log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount);
+            containerFailures.put(containerId, new ResourceFailure(currentFailCount, System.currentTimeMillis()));
           }
+        }
 
-          if (!tooManyFailedContainers) {
-            log.info("Requesting a refactor container ");
-            // Request a refactor container
-            containerAllocator.requestResource(containerId, lastSeenOn);
-          }
+        if (!tooManyFailedContainers) {
+          log.info("Requesting a new container ");
+          // Request a new container
+          containerAllocator.requestResource(containerId, lastSeenOn);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index bde3fac..653fb4e 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -116,6 +116,14 @@ public class SamzaApplicationState {
 
   public final AtomicInteger matchedResourceRequests = new AtomicInteger(0);
 
+  /**
+   * Number of invalid container notifications.
+   *
+   * A notification is "invalid" if the corresponding container is not currently managed by the
+   * {@link ContainerProcessManager}
+   */
+  public final AtomicInteger redundantNotifications = new AtomicInteger(0);
+
   public SamzaApplicationState(JobModelManager jobModelManager) {
     this.jobModelManager = jobModelManager;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 6c3081b..c396ed6 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -52,6 +52,7 @@ class ContainerProcessManagerMetrics(
     val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get())
     val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())
     val mContainers = newGauge("container-count", () => state.containerCount)
+    val mRedundantNotifications = newGauge("redundant-notifications", () => state.redundantNotifications.get())
 
     val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0)
     val mLocalityMatchedRequests = newGauge(

http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
index 109ed47..449b484 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
@@ -23,9 +23,13 @@ import org.apache.samza.config.Config;
 import java.lang.reflect.Field;
 
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 public class MockContainerAllocator extends ContainerAllocator {
   public int requestedContainers = 0;
+  private Semaphore semaphore = new Semaphore(0);
 
   public MockContainerAllocator(ClusterResourceManager manager,
                                 Config config,
@@ -33,6 +37,20 @@ public class MockContainerAllocator extends ContainerAllocator {
     super(manager, config, state);
   }
 
+  /**
+   * Causes the current thread to block until the expected number of containers have started.
+   *
+   * @param numExpectedContainers the number of containers expected to start
+   * @param timeout the maximum time to wait
+   * @param unit the time unit of the {@code timeout} argument
+   *
+   * @return a boolean that specifies whether containers started within the timeout.
+   * @throws InterruptedException  if the current thread is interrupted while waiting
+   */
+  boolean awaitContainersStart(int numExpectedContainers, long timeout, TimeUnit unit) throws InterruptedException {
+    return semaphore.tryAcquire(numExpectedContainers, timeout, unit);
+  }
+
   @Override
   public void requestResources(Map<String, String> containerToHostMappings) {
     requestedContainers += containerToHostMappings.size();
@@ -45,4 +63,10 @@ public class MockContainerAllocator extends ContainerAllocator {
 
     return (ResourceRequestState) field.get(this);
   }
+
+  @Override
+  protected void runStreamProcessor(SamzaResourceRequest request, String preferredHost) {
+    super.runStreamProcessor(request, preferredHost);
+    semaphore.release();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 8199559..6978341 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -32,16 +32,17 @@ import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -173,15 +174,19 @@ public class TestContainerProcessManager {
         state);
 
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
-
+    CountDownLatch latch = new CountDownLatch(1);
     getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, new Thread() {
       public void run() {
         isRunning = true;
+        latch.countDown();
       }
     });
 
     taskManager.start();
-    Thread.sleep(1000);
+
+    if (!latch.await(2, TimeUnit.SECONDS)) {
+      Assert.fail("timed out waiting for the latch to expire");
+    }
 
     // Verify Allocator thread has started running
     assertTrue(isRunning);
@@ -206,40 +211,56 @@ public class TestContainerProcessManager {
     );
     taskManager.start();
 
-    Thread.sleep(100);
-
     Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
     assertTrue(allocatorThread.isAlive());
 
     taskManager.stop();
 
-    Thread.sleep(100);
     assertFalse(allocatorThread.isAlive());
-
   }
 
   /**
    * Test Task Manager should stop when all containers finish
    */
   @Test
-  public void testTaskManagerShouldStopWhenContainersFinish() {
+  public void testTaskManagerShouldStopWhenContainersFinish() throws Exception {
     Config conf = getConfig();
     state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
 
-    ContainerProcessManager taskManager =  new ContainerProcessManager(
-        new MapConfig(conf),
-        state,
-        new MetricsRegistryMap(),
-        manager
+    ContainerProcessManager taskManager = new ContainerProcessManager(
+            new MapConfig(conf),
+            state,
+            new MetricsRegistryMap(),
+            manager
     );
 
+    MockContainerAllocator allocator = new MockContainerAllocator(
+            manager,
+            conf,
+            state);
+
+    getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
+
+    Thread thread = new Thread(allocator);
+    getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
+
+    // start triggers a request
     taskManager.start();
 
     assertFalse(taskManager.shouldShutdown());
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
-    taskManager.onResourceCompleted(new SamzaResourceStatus("123", "diagnostics", SamzaResourceStatus.SUCCESS));
+    SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
+    taskManager.onResourceAllocated(container);
 
+    // Allow container to run and update state
 
+    if (!allocator.awaitContainersStart(1,2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+    assertFalse(taskManager.shouldShutdown());
+
+    taskManager.onResourceCompleted(new SamzaResourceStatus("id0", "diagnostics", SamzaResourceStatus.SUCCESS));
     assertTrue(taskManager.shouldShutdown());
   }
 
@@ -281,7 +302,9 @@ public class TestContainerProcessManager {
     taskManager.onResourceAllocated(container);
 
     // Allow container to run and update state
-    Thread.sleep(300);
+    if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
 
     // Create first container failure
     taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "diagnostics", 1));
@@ -299,7 +322,9 @@ public class TestContainerProcessManager {
     taskManager.onResourceAllocated(container);
 
     // Allow container to run and update state
-    Thread.sleep(1000);
+    if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
 
     assertTrue(state.jobHealthy.get());
 
@@ -318,6 +343,117 @@ public class TestContainerProcessManager {
     taskManager.stop();
   }
 
+  @Test
+  public void testInvalidNotificationsAreIgnored() throws Exception {
+    Config conf = getConfig();
+
+    Map<String, String> config = new HashMap<>();
+    config.putAll(getConfig());
+    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+
+    ContainerProcessManager taskManager = new ContainerProcessManager(
+            new MapConfig(conf),
+            state,
+            new MetricsRegistryMap(),
+            manager
+    );
+
+    MockContainerAllocator allocator = new MockContainerAllocator(
+            manager,
+            conf,
+            state);
+    getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
+
+    Thread thread = new Thread(allocator);
+    getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
+
+    // Start the task manager
+    taskManager.start();
+
+    SamzaResource container = new SamzaResource(1, 1000, "abc", "id1");
+    taskManager.onResourceAllocated(container);
+
+    // Allow container to run and update state
+    if (!allocator.awaitContainersStart(1,2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+
+    // Create container failure - with ContainerExitStatus.DISKS_FAILED
+    taskManager.onResourceCompleted(new SamzaResourceStatus("invalidContainerID", "Disk failure", SamzaResourceStatus.DISK_FAIL));
+
+    // The above failure should not trigger any container requests, since it is for an invalid container ID
+    assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
+    assertFalse(taskManager.shouldShutdown());
+    assertTrue(state.jobHealthy.get());
+    assertEquals(state.redundantNotifications.get(), 1);
+  }
+
+  @Test
+  public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception {
+    Config conf = getConfig();
+
+    Map<String, String> config = new HashMap<>();
+    config.putAll(getConfig());
+    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+
+    ContainerProcessManager taskManager = new ContainerProcessManager(
+            new MapConfig(conf),
+            state,
+            new MetricsRegistryMap(),
+            manager
+    );
+
+    MockContainerAllocator allocator = new MockContainerAllocator(
+            manager,
+            conf,
+            state);
+    getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
+
+    Thread thread = new Thread(allocator);
+    getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
+
+    // Start the task manager
+    taskManager.start();
+    assertFalse(taskManager.shouldShutdown());
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+
+    SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1");
+    taskManager.onResourceAllocated(container1);
+
+    // Allow container to run and update state
+    if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+    assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
+
+    // Create container failure - with ContainerExitStatus.DISKS_FAILED
+    taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL));
+
+    // The above failure should trigger a container request
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+    assertFalse(taskManager.shouldShutdown());
+    assertFalse(state.jobHealthy.get());
+    assertEquals(2, manager.resourceRequests.size());
+    assertEquals(0, manager.releasedResources.size());
+    assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+
+    SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
+    taskManager.onResourceAllocated(container2);
+
+    // Allow container to run and update state
+    if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+    assertTrue(state.jobHealthy.get());
+
+    // Simulate a duplicate notification for container 1 with a different exit code
+    taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.PREEMPTED));
+    // assert that a duplicate notification does not change metrics (including job health)
+    assertEquals(state.redundantNotifications.get(), 1);
+    assertEquals(2, manager.resourceRequests.size());
+    assertEquals(0, manager.releasedResources.size());
+    assertTrue(state.jobHealthy.get());
+  }
 
   /**
    * Test AM requests a new container when a task fails
@@ -329,8 +465,6 @@ public class TestContainerProcessManager {
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    config.remove("yarn.container.retry.count");
-
     state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
@@ -354,14 +488,17 @@ public class TestContainerProcessManager {
     assertFalse(taskManager.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
-    SamzaResource container = new SamzaResource(1, 1000, "abc", "id1");
-    taskManager.onResourceAllocated(container);
+    SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1");
+    taskManager.onResourceAllocated(container1);
 
     // Allow container to run and update state
-    Thread.sleep(300);
+    if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+    assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
 
     // Create container failure - with ContainerExitStatus.DISKS_FAILED
-    taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL));
+    taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL));
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
@@ -371,21 +508,37 @@ public class TestContainerProcessManager {
     assertEquals(0, manager.releasedResources.size());
     assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
 
+    SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
+    taskManager.onResourceAllocated(container2);
+
+    // Allow container to run and update state
+    if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+
     // Create container failure - with ContainerExitStatus.PREEMPTED
-    taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Preemption",  SamzaResourceStatus.PREEMPTED));
+    taskManager.onResourceCompleted(new SamzaResourceStatus(container2.getResourceID(), "Preemption",  SamzaResourceStatus.PREEMPTED));
+    assertEquals(3, manager.resourceRequests.size());
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
     assertFalse(taskManager.shouldShutdown());
     assertFalse(state.jobHealthy.get());
     assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+    SamzaResource container3 = new SamzaResource(1, 1000, "abc", "id3");
+    taskManager.onResourceAllocated(container3);
+
+    // Allow container to run and update state
+    if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
 
     // Create container failure - with ContainerExitStatus.ABORTED
-    taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Aborted", SamzaResourceStatus.ABORTED));
+    taskManager.onResourceCompleted(new SamzaResourceStatus(container3.getResourceID(), "Aborted", SamzaResourceStatus.ABORTED));
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
-    assertEquals(2, manager.resourceRequests.size());
+    assertEquals(4, manager.resourceRequests.size());
     assertEquals(0, manager.releasedResources.size());
     assertFalse(taskManager.shouldShutdown());
     assertFalse(state.jobHealthy.get());