You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/04/16 00:19:36 UTC

samza git commit: SAMZA-910 - Host Affinity - Fix sporadic failures in Container Allocator tests

Repository: samza
Updated Branches:
  refs/heads/master ed5be4f92 -> 1497bf6c2


SAMZA-910 - Host Affinity - Fix sporadic failures in Container Allocator tests


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1497bf6c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1497bf6c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1497bf6c

Branch: refs/heads/master
Commit: 1497bf6c2629dd165f02352480584d3b3a9e5c18
Parents: ed5be4f
Author: Jacob Maes <ja...@gmail.com>
Authored: Fri Apr 15 14:21:16 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri Apr 15 14:21:16 2016 -0700

----------------------------------------------------------------------
 .../samza/job/yarn/TestContainerAllocator.java  | 235 +++-----------
 .../job/yarn/TestContainerAllocatorCommon.java  | 225 ++++++++++++++
 .../yarn/TestHostAwareContainerAllocator.java   | 306 +++++--------------
 .../job/yarn/util/MockContainerListener.java    | 146 ++++++---
 .../yarn/util/MockContainerRequestState.java    |  19 +-
 .../samza/job/yarn/util/MockContainerUtil.java  |  12 +
 6 files changed, 464 insertions(+), 479 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
index b253f98..e21aded 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
@@ -19,35 +19,15 @@
 
 package org.apache.samza.job.yarn;
 
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobCoordinator;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.job.yarn.util.MockContainerListener;
 import org.apache.samza.job.yarn.util.MockContainerRequestState;
-import org.apache.samza.job.yarn.util.MockContainerUtil;
-import org.apache.samza.job.yarn.util.MockHttpServer;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
 import org.apache.samza.job.yarn.util.TestUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -55,18 +35,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-public class TestContainerAllocator {
-  private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-  private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
+public class TestContainerAllocator extends TestContainerAllocatorCommon {
 
-  private AMRMClientAsyncImpl amRmClientAsync;
-  private TestAMRMClientImpl testAMRMClient;
-  private MockContainerRequestState requestState;
-  private ContainerAllocator containerAllocator;
-  private ContainerUtil containerUtil;
-  private Thread allocatorThread;
-
-  private Config config = new MapConfig(new HashMap<String, String>() {
+  private final Config config = new MapConfig(new HashMap<String, String>() {
     {
       put("yarn.container.count", "1");
       put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
@@ -77,68 +48,57 @@ public class TestContainerAllocator {
       put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
       put("yarn.container.retry.count", "1");
       put("yarn.container.retry.window.ms", "1999999999");
-      put("yarn.allocator.sleep.ms", "10");
+      put("yarn.container.request.timeout.ms", "3");
+      put("yarn.allocator.sleep.ms", "1");
     }
   });
 
-  private SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2);
+  @Override
+  protected Config getConfig() {
+    return config;
+  }
 
-  private JobCoordinator getCoordinator(int containerCount) {
-    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
-    for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
-      containers.put(i, container);
-    }
-    JobModel jobModel = new JobModel(config, containers);
-    return new JobCoordinator(jobModel, server, null);
+  @Override
+  protected MockContainerRequestState createContainerRequestState(
+      AMRMClientAsync<AMRMClient.ContainerRequest> amClient) {
+    return new MockContainerRequestState(amClient, false);
   }
 
-  @Before
-  public void setup() throws Exception {
-    // Create AMRMClient
-    testAMRMClient = new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-            false,
-            new ArrayList<Container>(),
-            new ArrayList<ContainerStatus>()
-        ));
-    amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
+  /**
+   * Test request containers with no containerToHostMapping makes the right number of requests
+   */
+  @Test
+  public void testRequestContainersWithNoMapping() throws Exception {
+    int containerCount = 4;
+    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
+    for (int i = 0; i < containerCount; i++) {
+      containersToHostMapping.put(i, null);
+    }
+    allocatorThread.start();
 
-    // Initialize certain state variables (mostly to avoid NPE)
-    state.coordinatorUrl = new URL("http://localhost:7778/");
+    containerAllocator.requestContainers(containersToHostMapping);
 
-    requestState = new MockContainerRequestState(amRmClientAsync, false);
-    containerUtil = TestUtil.getContainerUtil(config, state);
-    containerAllocator = new ContainerAllocator(
-        amRmClientAsync,
-        containerUtil,
-        new YarnConfig(config)
-    );
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    requestStateField.set(containerAllocator, requestState);
+    assertNotNull(requestState);
 
-    allocatorThread = new Thread(containerAllocator);
-  }
+    assertNotNull(requestState.getRequestsQueue());
+    assertTrue(requestState.getRequestsQueue().size() == 4);
 
-  @After
-  public void teardown() throws Exception {
-    containerAllocator.setIsRunning(false);
-    allocatorThread.join();
+    // If host-affinty is not enabled, it doesn't update the requestMap
+    assertNotNull(requestState.getRequestsToCountMap());
+    assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
   }
-
   /**
    * Adds all containers returned to ANY_HOST only
    */
   @Test
   public void testAddContainer() throws Exception {
-    assertNull(requestState.getContainersOnAHost("abc"));
+    assertNull(requestState.getContainersOnAHost("host1"));
     assertNull(requestState.getContainersOnAHost(ANY_HOST));
 
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123));
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "xyz", 123));
+    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123));
+    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123));
 
-    assertNull(requestState.getContainersOnAHost("abc"));
+    assertNull(requestState.getContainersOnAHost("host1"));
     assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
     assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
   }
@@ -150,10 +110,10 @@ public class TestContainerAllocator {
   public void testRequestContainers() throws Exception {
     Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
       {
-        put(0, "abc");
-        put(1, "def");
+        put(0, "host1");
+        put(1, "host2");
         put(2, null);
-        put(3, "abc");
+        put(3, "host1");
       }
     };
 
@@ -174,121 +134,4 @@ public class TestContainerAllocator {
     assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
   }
 
-  /**
-   * Test request containers with no containerToHostMapping makes the right number of requests
-   */
-  @Test
-  public void testRequestContainersWithNoMapping() throws Exception {
-    int containerCount = 4;
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
-    for (int i = 0; i < containerCount; i++) {
-      containersToHostMapping.put(i, null);
-    }
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(requestState);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 4);
-
-    // If host-affinty is not enabled, it doesn't update the requestMap
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
-  }
-
-  /**
-   * If the container fails to start e.g because it fails to connect to a NM on a host that
-   * is down, the allocator should request a new container on a different host.
-   */
-  @Test
-  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "2", 123);
-    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "1", 123);
-
-    ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!");
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(2, 1, 2, null, new Runnable() {
-      @Override
-      public void run() {
-        // The failed container should be released. The successful one should not.
-        assertNotNull(testAMRMClient.getRelease());
-        assertEquals(1, testAMRMClient.getRelease().size());
-        assertTrue(testAMRMClient.getRelease().contains(container.getId()));
-      }
-    },
-        new Runnable() {
-          @Override
-          public void run() {
-            // Test that the first request assignment had a preferred host and the retry didn't
-            assertEquals(2, requestState.assignedRequests.size());
-
-            SamzaContainerRequest request = requestState.assignedRequests.remove();
-            assertEquals(0, request.expectedContainerId);
-            assertEquals("2", request.getPreferredHost());
-
-            request = requestState.assignedRequests.remove();
-            assertEquals(0, request.expectedContainerId);
-            assertEquals("ANY_HOST", request.getPreferredHost());
-
-            // This routine should be called after the retry is assigned, but before it's started.
-            // So there should still be 1 container needed.
-            assertEquals(1, state.neededContainers.get());
-          }
-        }
-    );
-    requestState.registerContainerListener(listener);
-
-    allocatorThread.start();
-
-    // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
-    containerAllocator.requestContainer(0, "2");
-    containerAllocator.addContainer(container);
-    containerAllocator.addContainer(container1);
-
-    listener.verify();
-  }
-
-  /**
-   * Extra allocated containers that are returned by the RM and unused by the AM should be released.
-   * Containers are considered "extra" only when there are no more pending requests to fulfill
-   * @throws Exception
-   */
-  @Test
-  public void testAllocatorReleasesExtraContainers() throws Exception {
-    final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
-    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
-    final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(3, 2, 0, null, new Runnable() {
-      @Override
-      public void run() {
-        assertNotNull(testAMRMClient.getRelease());
-        assertEquals(2, testAMRMClient.getRelease().size());
-        assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
-        assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
-        // Test that state is cleaned up
-        assertEquals(0, requestState.getRequestsQueue().size());
-        assertEquals(0, requestState.getRequestsToCountMap().size());
-        assertNull(requestState.getContainersOnAHost("abc"));
-        assertNull(requestState.getContainersOnAHost("def"));
-      }
-    }, null);
-    requestState.registerContainerListener(listener);
-
-    allocatorThread.start();
-
-    containerAllocator.requestContainer(0, "abc");
-
-    containerAllocator.addContainer(container);
-    containerAllocator.addContainer(container1);
-    containerAllocator.addContainer(container2);
-
-    listener.verify();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
new file mode 100644
index 0000000..5badd29
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.YarnConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.job.yarn.util.MockContainerListener;
+import org.apache.samza.job.yarn.util.MockContainerRequestState;
+import org.apache.samza.job.yarn.util.MockContainerUtil;
+import org.apache.samza.job.yarn.util.MockHttpServer;
+import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
+import org.apache.samza.job.yarn.util.TestUtil;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Handles all common fields/tests for ContainerAllocators.
+ */
+public abstract class TestContainerAllocatorCommon {
+  protected static final String ANY_HOST = ContainerRequestState.ANY_HOST;
+
+  protected final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
+
+  protected AMRMClientAsyncImpl amRmClientAsync;
+  protected TestAMRMClientImpl testAMRMClient;
+  protected MockContainerRequestState requestState;
+  protected AbstractContainerAllocator containerAllocator;
+  protected Thread allocatorThread;
+  protected ContainerUtil containerUtil;
+
+  protected SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2);
+
+  protected abstract Config getConfig();
+  protected abstract MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient);
+
+  private JobCoordinator getCoordinator(int containerCount) {
+    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+    for (int i = 0; i < containerCount; i++) {
+      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
+      containers.put(i, container);
+    }
+    JobModel jobModel = new JobModel(getConfig(), containers);
+    return new JobCoordinator(jobModel, server, null);
+  }
+
+
+  @Before
+  public void setup() throws Exception {
+    // Create AMRMClient
+    testAMRMClient = new TestAMRMClientImpl(
+        TestUtil.getAppMasterResponse(
+            false,
+            new ArrayList<Container>(),
+            new ArrayList<ContainerStatus>()
+        ));
+    amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
+
+    // Initialize certain state variables
+    state.coordinatorUrl = new URL("http://localhost:7778/");
+
+    containerUtil = TestUtil.getContainerUtil(getConfig(), state);
+
+    requestState = createContainerRequestState(amRmClientAsync);
+    containerAllocator = new HostAwareContainerAllocator(
+        amRmClientAsync,
+        containerUtil,
+        new YarnConfig(getConfig())
+    );
+    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
+    requestStateField.setAccessible(true);
+    requestStateField.set(containerAllocator, requestState);
+
+    allocatorThread = new Thread(containerAllocator);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    containerAllocator.setIsRunning(false);
+    allocatorThread.join();
+  }
+
+  /**
+   * If the container fails to start e.g because it fails to connect to a NM on a host that
+   * is down, the allocator should request a new container on a different host.
+   */
+  @Test
+  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
+    final Container container = TestUtil
+        .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host2", 123);
+    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123);
+
+    ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!");
+
+    Runnable releasedContainerAssertions = new Runnable() {
+      @Override
+      public void run() {
+        // The failed container should be released. The successful one should not.
+        assertNotNull(testAMRMClient.getRelease());
+        assertEquals(1, testAMRMClient.getRelease().size());
+        assertTrue(testAMRMClient.getRelease().contains(container.getId()));
+      }
+    };
+
+    Runnable assignedContainerAssertions = new Runnable() {
+      @Override
+      public void run() {
+        // Test that the first request assignment had a preferred host and the retry didn't
+        assertEquals(2, requestState.assignedRequests.size());
+
+        SamzaContainerRequest request = requestState.assignedRequests.remove();
+        assertEquals(0, request.expectedContainerId);
+        assertEquals("host2", request.getPreferredHost());
+
+        request = requestState.assignedRequests.remove();
+        assertEquals(0, request.expectedContainerId);
+        assertEquals("ANY_HOST", request.getPreferredHost());
+
+        // This routine should be called after the retry is assigned, but before it's started.
+        // So there should still be 1 container needed because neededContainers should not be decremented for a failed start.
+        assertEquals(1, state.neededContainers.get());
+      }
+    };
+
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, releasedContainerAssertions, assignedContainerAssertions, null);
+    requestState.registerContainerListener(listener);
+    state.neededContainers.set(1); // Normally this would be done in the SamzaTaskManager
+
+    // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
+    containerAllocator.requestContainer(0, "host2");
+    containerAllocator.addContainer(container);
+    containerAllocator.addContainer(container1);
+
+    allocatorThread.start();
+
+    listener.verify();
+  }
+
+
+  /**
+   * Extra allocated containers that are returned by the RM and unused by the AM should be released.
+   * Containers are considered "extra" only when there are no more pending requests to fulfill
+   * @throws Exception
+   */
+  @Test
+  public void testAllocatorReleasesExtraContainers() throws Exception {
+    final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host1", 123);
+    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123);
+    final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host2", 123);
+
+    Runnable releasedContainerAssertions = new Runnable() {
+      @Override
+      public void run() {
+        assertNotNull(testAMRMClient.getRelease());
+        assertEquals(2, testAMRMClient.getRelease().size());
+        assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
+        assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
+
+        // Test that state is cleaned up
+        assertEquals(0, requestState.getRequestsQueue().size());
+        assertEquals(0, requestState.getRequestsToCountMap().size());
+        assertNull(requestState.getContainersOnAHost("host1"));
+        assertNull(requestState.getContainersOnAHost("host2"));
+      }
+    };
+
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, null, releasedContainerAssertions, null, null);
+    requestState.registerContainerListener(listener);
+
+    containerAllocator.requestContainer(0, "host1");
+
+    containerAllocator.addContainer(container);
+    containerAllocator.addContainer(container1);
+    containerAllocator.addContainer(container2);
+
+    allocatorThread.start();
+
+    listener.verify();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
index 93e430b..ead7200 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
@@ -18,36 +18,20 @@
  */
 package org.apache.samza.job.yarn;
 
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobCoordinator;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.job.yarn.util.MockContainerListener;
 import org.apache.samza.job.yarn.util.MockContainerRequestState;
 import org.apache.samza.job.yarn.util.MockContainerUtil;
-import org.apache.samza.job.yarn.util.MockHttpServer;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
 import org.apache.samza.job.yarn.util.TestUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -55,19 +39,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-public class TestHostAwareContainerAllocator {
-  private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
+public class TestHostAwareContainerAllocator extends TestContainerAllocatorCommon {
 
-  private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
-
-  private AMRMClientAsyncImpl amRmClientAsync;
-  private TestAMRMClientImpl testAMRMClient;
-  private MockContainerRequestState requestState;
-  private HostAwareContainerAllocator containerAllocator;
-  private Thread allocatorThread;
-  private ContainerUtil containerUtil;
-
-  private Config config = new MapConfig(new HashMap<String, String>() {
+  private final Config config = new MapConfig(new HashMap<String, String>() {
     {
       put("yarn.container.count", "1");
       put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
@@ -84,147 +58,14 @@ public class TestHostAwareContainerAllocator {
     }
   });
 
-  private Config getConfig() {
-    Map<String, String> map = new HashMap<>();
-    map.putAll(config);
-    return new MapConfig(map);
-  }
-
-  private SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2);
-
-  private JobCoordinator getCoordinator(int containerCount) {
-    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
-    for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
-      containers.put(i, container);
-    }
-    JobModel jobModel = new JobModel(getConfig(), containers);
-    return new JobCoordinator(jobModel, server, null);
-  }
-
-
-  @Before
-  public void setup() throws Exception {
-    // Create AMRMClient
-    testAMRMClient = new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-            false,
-            new ArrayList<Container>(),
-            new ArrayList<ContainerStatus>()
-        ));
-    amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
-
-    // Initialize certain state variables
-    state.coordinatorUrl = new URL("http://localhost:7778/");
-
-    containerUtil = TestUtil.getContainerUtil(getConfig(), state);
-
-    requestState = new MockContainerRequestState(amRmClientAsync, true);
-    containerAllocator = new HostAwareContainerAllocator(
-        amRmClientAsync,
-        containerUtil,
-        new YarnConfig(config)
-    );
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    requestStateField.set(containerAllocator, requestState);
-
-    allocatorThread = new Thread(containerAllocator);
+  @Override
+  protected Config getConfig() {
+    return config;
   }
 
-  @After
-  public void teardown() throws Exception {
-    containerAllocator.setIsRunning(false);
-    allocatorThread.join();
-  }
-
-  /**
-   * If the container fails to start e.g because it fails to connect to a NM on a host that
-   * is down, the allocator should request a new container on a different host.
-   */
-  @Test
-  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "2", 123);
-    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "1", 123);
-
-    ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!");
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(2, 1, 2, null, new Runnable() {
-      @Override
-      public void run() {
-        // The failed container should be released. The successful one should not.
-        assertNotNull(testAMRMClient.getRelease());
-        assertEquals(1, testAMRMClient.getRelease().size());
-        assertTrue(testAMRMClient.getRelease().contains(container.getId()));
-      }
-    },
-        new Runnable() {
-          @Override
-          public void run() {
-            // Test that the first request assignment had a preferred host and the retry didn't
-            assertEquals(2, requestState.assignedRequests.size());
-
-            SamzaContainerRequest request = requestState.assignedRequests.remove();
-            assertEquals(0, request.expectedContainerId);
-            assertEquals("2", request.getPreferredHost());
-
-            request = requestState.assignedRequests.remove();
-            assertEquals(0, request.expectedContainerId);
-            assertEquals("ANY_HOST", request.getPreferredHost());
-
-            // This routine should be called after the retry is assigned, but before it's started.
-            // So there should still be 1 container needed.
-            assertEquals(1, state.neededContainers.get());
-          }
-        }
-    );
-    requestState.registerContainerListener(listener);
-
-    // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
-    containerAllocator.requestContainer(0, "2");
-    containerAllocator.addContainer(container1);
-    containerAllocator.addContainer(container);
-
-    allocatorThread.start();
-
-    listener.verify();
-  }
-
-  @Test
-  public void testAllocatorReleasesExtraContainers() throws Exception {
-    final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
-    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
-    final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(3, 2, 0, null, new Runnable() {
-      @Override
-      public void run() {
-        assertNotNull(testAMRMClient.getRelease());
-        assertEquals(2, testAMRMClient.getRelease().size());
-        assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
-        assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
-        // Test that state is cleaned up
-        assertEquals(0, requestState.getRequestsQueue().size());
-        assertEquals(0, requestState.getRequestsToCountMap().size());
-        assertNull(requestState.getContainersOnAHost("abc"));
-        assertNull(requestState.getContainersOnAHost("def"));
-      }
-    },
-    null);
-    requestState.registerContainerListener(listener);
-
-    allocatorThread.start();
-
-    containerAllocator.requestContainer(0, "abc");
-
-    containerAllocator.addContainer(container);
-    containerAllocator.addContainer(container1);
-    containerAllocator.addContainer(container2);
-
-    listener.verify();
+  @Override
+  protected MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient) {
+    return new MockContainerRequestState(amClient, true);
   }
 
   /**
@@ -259,28 +100,28 @@ public class TestHostAwareContainerAllocator {
   public void testAddContainerWithHostAffinity() throws Exception {
     containerAllocator.requestContainers(new HashMap<Integer, String>() {
       {
-        put(0, "abc");
-        put(1, "xyz");
+        put(0, "host1");
+        put(1, "host3");
       }
     });
 
-    assertNotNull(requestState.getContainersOnAHost("abc"));
-    assertEquals(0, requestState.getContainersOnAHost("abc").size());
+    assertNotNull(requestState.getContainersOnAHost("host1"));
+    assertEquals(0, requestState.getContainersOnAHost("host1").size());
 
-    assertNotNull(requestState.getContainersOnAHost("xyz"));
-    assertEquals(0, requestState.getContainersOnAHost("xyz").size());
+    assertNotNull(requestState.getContainersOnAHost("host3"));
+    assertEquals(0, requestState.getContainersOnAHost("host3").size());
 
     assertNull(requestState.getContainersOnAHost(ANY_HOST));
 
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123));
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "def", 123));
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "xyz", 123));
+    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123));
+    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "host2", 123));
+    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123));
 
-    assertNotNull(requestState.getContainersOnAHost("abc"));
-    assertEquals(1, requestState.getContainersOnAHost("abc").size());
+    assertNotNull(requestState.getContainersOnAHost("host1"));
+    assertEquals(1, requestState.getContainersOnAHost("host1").size());
 
-    assertNotNull(requestState.getContainersOnAHost("xyz"));
-    assertEquals(1, requestState.getContainersOnAHost("xyz").size());
+    assertNotNull(requestState.getContainersOnAHost("host3"));
+    assertEquals(1, requestState.getContainersOnAHost("host3").size());
 
     assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
     assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 1);
@@ -293,10 +134,10 @@ public class TestHostAwareContainerAllocator {
   public void testRequestContainers() throws Exception {
     Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
       {
-        put(0, "abc");
-        put(1, "def");
+        put(0, "host1");
+        put(1, "host2");
         put(2, null);
-        put(3, "abc");
+        put(3, "host1");
       }
     };
     allocatorThread.start();
@@ -314,11 +155,11 @@ public class TestHostAwareContainerAllocator {
     assertNotNull(requestState.getRequestsToCountMap());
     Map<String, AtomicInteger> requestsMap = requestState.getRequestsToCountMap();
 
-    assertNotNull(requestsMap.get("abc"));
-    assertEquals(2, requestsMap.get("abc").get());
+    assertNotNull(requestsMap.get("host1"));
+    assertEquals(2, requestsMap.get("host1").get());
 
-    assertNotNull(requestsMap.get("def"));
-    assertEquals(1, requestsMap.get("def").get());
+    assertNotNull(requestsMap.get("host2"));
+    assertEquals(1, requestsMap.get("host2").get());
 
     assertNotNull(requestsMap.get(ANY_HOST));
     assertEquals(1, requestsMap.get(ANY_HOST).get());
@@ -331,8 +172,8 @@ public class TestHostAwareContainerAllocator {
   public void testExpiredRequestHandling() throws Exception {
     Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
       {
-        put(0, "abc");
-        put(1, "def");
+        put(0, "requestedHost1");
+        put(1, "requestedHost2");
       }
     };
     containerAllocator.requestContainers(containersToHostMapping);
@@ -341,51 +182,72 @@ public class TestHostAwareContainerAllocator {
     assertTrue(requestState.getRequestsQueue().size() == 2);
 
     assertNotNull(requestState.getRequestsToCountMap());
-    assertNotNull(requestState.getRequestsToCountMap().get("abc"));
-    assertTrue(requestState.getRequestsToCountMap().get("abc").get() == 1);
+    assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1"));
+    assertTrue(requestState.getRequestsToCountMap().get("requestedHost1").get() == 1);
 
-    assertNotNull(requestState.getRequestsToCountMap().get("def"));
-    assertTrue(requestState.getRequestsToCountMap().get("def").get() == 1);
+    assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2"));
+    assertTrue(requestState.getRequestsToCountMap().get("requestedHost2").get() == 1);
 
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(2, 0, 0, new Runnable() {
+    final Container container0 = TestUtil
+        .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "availableHost1", 123);
+    final Container container1 = TestUtil
+        .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "availableHost2", 123);
+
+    Runnable addedContainerAssertions = new Runnable() {
       @Override
       public void run() {
-        assertNull(requestState.getContainersOnAHost("xyz"));
-        assertNull(requestState.getContainersOnAHost("zzz"));
+        assertNotNull(requestState.getRequestsToCountMap());
+        assertNull(requestState.getContainersOnAHost("availableHost1"));
+        assertNull(requestState.getContainersOnAHost("availableHost2"));
         assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
         assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
       }
-    }, null, null);
-    requestState.registerContainerListener(listener);
-
-    allocatorThread.start();
-
-    Container container0 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "xyz", 123);
-    Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "zzz", 123);
-    containerAllocator.addContainer(container0);
-    containerAllocator.addContainer(container1);
+    };
 
-    listener.verify();
+    Runnable assignedContainerAssertions = new Runnable() {
+      @Override
+      public void run() {
+        List<Container> anyHostContainers = requestState.getContainersOnAHost(ANY_HOST);
+        assertTrue(anyHostContainers == null || anyHostContainers.isEmpty());
 
-    Thread.sleep(1000);
+        assertNotNull(requestState.getRequestsQueue());
+        assertTrue(requestState.getRequestsQueue().size() == 0);
 
-    MockContainerUtil mockContainerUtil = (MockContainerUtil) containerUtil;
+        assertNotNull(requestState.getRequestsToCountMap());
+        assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1"));
+        assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2"));
+      }
+    };
 
-    assertNotNull(mockContainerUtil.runningContainerList.get("xyz"));
-    assertTrue(mockContainerUtil.runningContainerList.get("xyz").contains(container0));
+    Runnable runningContainerAssertions = new Runnable() {
+      @Override
+      public void run() {
+        MockContainerUtil mockContainerUtil = (MockContainerUtil) containerUtil;
 
-    assertNotNull(mockContainerUtil.runningContainerList.get("zzz"));
-    assertTrue(mockContainerUtil.runningContainerList.get("zzz").contains(container1));
+        assertNotNull(mockContainerUtil.runningContainerList.get("availableHost1"));
+        assertTrue(mockContainerUtil.runningContainerList.get("availableHost1").contains(container0));
 
-    assertNull(requestState.getContainersOnAHost(ANY_HOST));
+        assertNotNull(mockContainerUtil.runningContainerList.get("availableHost2"));
+        assertTrue(mockContainerUtil.runningContainerList.get("availableHost2").contains(container1));
+      }
+    };
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(
+        2, 0, 2, 2,
+        addedContainerAssertions,
+        null,
+        assignedContainerAssertions,
+        runningContainerAssertions);
+    requestState.registerContainerListener(listener);
+    ((MockContainerUtil) containerUtil).registerContainerListener(listener);
 
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 0);
+    containerAllocator.addContainer(container0);
+    containerAllocator.addContainer(container1);
 
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertNull(requestState.getRequestsToCountMap().get("abc"));
+    // Start after adding containers to avoid a race condition between the allocator thread
+    // using the containers and the assertions after the containers are added.
+    allocatorThread.start();
 
-    assertNull(requestState.getRequestsToCountMap().get("def"));
+    listener.verify();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
index cb82ccc..43bda8f 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
@@ -19,82 +19,130 @@
 
 package org.apache.samza.job.yarn.util;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.samza.job.yarn.SamzaContainerRequest;
-import scala.tools.nsc.Global;
 
 import static org.junit.Assert.assertTrue;
 
 public class MockContainerListener {
-  private static final int NUM_CONDITIONS = 3;
-  private boolean allContainersAdded = false;
-  private boolean allContainersReleased = false;
-  private final int numExpectedContainersAdded;
-  private final int numExpectedContainersReleased;
-  private final int numExpectedContainersAssigned;
-  private final Runnable addContainerAssertions;
-  private final Runnable releaseContainerAssertions;
-  private final Runnable assignContainerAssertions;
+  private final CountDownLatch conditionLatch;
+
+
+  private final AsyncCountableCondition containersAdded;
+  private final AsyncCountableCondition containersReleased;
+  private final AsyncCountableCondition containersAssigned;
+  private final AsyncCountableCondition containersRunning;
+
+  private final AsyncCountableCondition[] allConditions;
 
   public MockContainerListener(int numExpectedContainersAdded,
       int numExpectedContainersReleased,
       int numExpectedContainersAssigned,
+      int numExpectedContainersRunning,
       Runnable addContainerAssertions,
       Runnable releaseContainerAssertions,
-      Runnable assignContainerAssertions) {
-    this.numExpectedContainersAdded = numExpectedContainersAdded;
-    this.numExpectedContainersReleased = numExpectedContainersReleased;
-    this.numExpectedContainersAssigned = numExpectedContainersAssigned;
-    this.addContainerAssertions = addContainerAssertions;
-    this.releaseContainerAssertions = releaseContainerAssertions;
-    this.assignContainerAssertions = assignContainerAssertions;
-  }
+      Runnable assignContainerAssertions,
+      Runnable runContainerAssertions) {
+    containersAdded = new AsyncCountableCondition("containers added", numExpectedContainersAdded, addContainerAssertions);
+    containersReleased = new AsyncCountableCondition("containers released", numExpectedContainersReleased, releaseContainerAssertions);
+    containersAssigned = new AsyncCountableCondition("containers assigned", numExpectedContainersAssigned, assignContainerAssertions);
+    containersRunning = new AsyncCountableCondition("containers running", numExpectedContainersRunning, runContainerAssertions);
 
-  public synchronized void postAddContainer(Container container, int totalAddedContainers) {
-    if (totalAddedContainers == numExpectedContainersAdded) {
-      if (addContainerAssertions != null) {
-        addContainerAssertions.run();
+    allConditions = new AsyncCountableCondition[] {containersAdded, containersReleased, containersAssigned, containersRunning};
+
+    int unsatisfiedConditions = 0;
+    for (AsyncCountableCondition condition : allConditions) {
+      if (!condition.isSatisfied()) {
+        unsatisfiedConditions++;
       }
+    }
+
+    conditionLatch = new CountDownLatch(unsatisfiedConditions);
+  }
 
-      allContainersAdded = true;
-      this.notifyAll();
+  public void postAddContainer(int totalAddedContainers) {
+    if (containersAdded.update(totalAddedContainers)) {
+      conditionLatch.countDown();
     }
   }
 
-  public synchronized void postReleaseContainers(int totalReleasedContainers) {
-    if (totalReleasedContainers == numExpectedContainersReleased) {
-      if (releaseContainerAssertions != null) {
-        releaseContainerAssertions.run();
-      }
+  public void postReleaseContainers(int totalReleasedContainers) {
+    if (containersReleased.update(totalReleasedContainers)) {
+      conditionLatch.countDown();
+    }
+  }
 
-      allContainersReleased = true;
-      this.notifyAll();
+  public void postUpdateRequestStateAfterAssignment(int totalAssignedContainers) {
+    if (containersAssigned.update(totalAssignedContainers)) {
+      conditionLatch.countDown();
     }
   }
 
-  public synchronized void verify() {
-    // There could be 1 notifyAll() for each condition, so we must wait up to that many times
-    for (int i = 0; i < NUM_CONDITIONS && !(allContainersAdded && allContainersReleased); i++) {
-      try {
-        this.wait(5000);
-      } catch (InterruptedException e) {
-        // Do nothing
-      }
+  public void postRunContainer(int totalRunningContainers) {
+    if (containersRunning.update(totalRunningContainers)) {
+      conditionLatch.countDown();
     }
+  }
 
-    assertTrue("Not all containers were added.", allContainersAdded);
-    assertTrue("Not all containers were released.", allContainersReleased);
+  /**
+   * This method should be called in the main thread. It waits for all the conditions to occur in the other
+   * threads and then verifies that they were in fact satisfied.
+   */
+  public void verify()
+      throws InterruptedException {
+    conditionLatch.await(5, TimeUnit.SECONDS);
+
+    for (AsyncCountableCondition condition : allConditions) {
+      condition.verify();
+    }
   }
 
-  public void postUpdateRequestStateAfterAssignment(int totalAssignedContainers) {
-    if (totalAssignedContainers == numExpectedContainersAssigned) {
-      if (assignContainerAssertions != null) {
-        assignContainerAssertions.run();
+  private static class AsyncCountableCondition {
+    private boolean satisfied = false;
+    private final int expectedCount;
+    private final Runnable postConditionAssertions;
+    private final String name;
+    private AssertionError assertionError = null;
+
+    private AsyncCountableCondition(String name, int expectedCount, Runnable postConditionAssertions) {
+      this.name = name;
+      this.expectedCount = expectedCount;
+      if (expectedCount == 0) satisfied = true;
+      this.postConditionAssertions = postConditionAssertions;
+    }
+
+    public boolean update(int latestCount) {
+      if (!satisfied && latestCount == expectedCount) {
+        if (postConditionAssertions != null) {
+          try {
+            postConditionAssertions.run();
+          } catch (Throwable t) {
+            assertionError = new AssertionError(String.format("Assertion for '%s' failed", name), t);
+          }
+        }
+
+        satisfied = true;
+        return true;
       }
+      return false;
+    }
+
+    public boolean isSatisfied() {
+      return satisfied;
+    }
+
+    public void verify() {
+      assertTrue(String.format("Condition '%s' was not satisfied", name), isSatisfied());
+
+      if (assertionError != null) {
+        throw assertionError;
+      }
+    }
 
-      this.notifyAll();
+    @Override
+    public String toString() {
+      return name;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
index 879a7d0..7c0b504 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
@@ -30,7 +30,7 @@ import org.apache.samza.job.yarn.SamzaContainerRequest;
 
 
 public class MockContainerRequestState extends ContainerRequestState {
-  private final List<MockContainerListener> _mockContainerListeners = new ArrayList<MockContainerListener>();
+  private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>();
   private int numAddedContainers = 0;
   private int numReleasedContainers = 0;
   private int numAssignedContainers = 0;
@@ -48,7 +48,7 @@ public class MockContainerRequestState extends ContainerRequestState {
     numAssignedContainers++;
     assignedRequests.add(request);
 
-    for (MockContainerListener listener : _mockContainerListeners) {
+    for (MockContainerListener listener : mockContainerListeners) {
       listener.postUpdateRequestStateAfterAssignment(numAssignedContainers);
     }
   }
@@ -58,8 +58,8 @@ public class MockContainerRequestState extends ContainerRequestState {
     super.addContainer(container);
 
     numAddedContainers++;
-    for (MockContainerListener listener : _mockContainerListeners) {
-      listener.postAddContainer(container, numAddedContainers);
+    for (MockContainerListener listener : mockContainerListeners) {
+      listener.postAddContainer(numAddedContainers);
     }
   }
 
@@ -67,7 +67,7 @@ public class MockContainerRequestState extends ContainerRequestState {
   public synchronized int releaseExtraContainers() {
     numReleasedContainers += super.releaseExtraContainers();
 
-    for (MockContainerListener listener : _mockContainerListeners) {
+    for (MockContainerListener listener : mockContainerListeners) {
       listener.postReleaseContainers(numReleasedContainers);
     }
 
@@ -79,17 +79,12 @@ public class MockContainerRequestState extends ContainerRequestState {
     super.releaseUnstartableContainer(container);
 
     numReleasedContainers += 1;
-    for (MockContainerListener listener : _mockContainerListeners) {
+    for (MockContainerListener listener : mockContainerListeners) {
       listener.postReleaseContainers(numReleasedContainers);
     }
   }
 
   public void registerContainerListener(MockContainerListener listener) {
-    _mockContainerListeners.add(listener);
+    mockContainerListeners.add(listener);
   }
-
-  public void clearContainerListeners() {
-    _mockContainerListeners.clear();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
index 2f9669f..cf3e143 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
@@ -33,6 +33,7 @@ import org.apache.samza.job.yarn.SamzaContainerLaunchException;
 
 
 public class MockContainerUtil extends ContainerUtil {
+  private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>();
   public final Map<String, List<Container>> runningContainerList = new HashMap<>();
   public Exception containerStartException = null;
 
@@ -54,6 +55,10 @@ public class MockContainerUtil extends ContainerUtil {
       runningContainerList.put(hostname, list);
     }
     super.runContainer(samzaContainerId, container);
+
+    for (MockContainerListener listener : mockContainerListeners) {
+      listener.postRunContainer(runningContainerList.size());
+    }
   }
 
   @Override
@@ -64,4 +69,11 @@ public class MockContainerUtil extends ContainerUtil {
     }
   }
 
+  public void registerContainerListener(MockContainerListener listener) {
+    mockContainerListeners.add(listener);
+  }
+
+  public void clearContainerListeners() {
+    mockContainerListeners.clear();
+  }
 }