You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:20 UTC

[04/50] [abbrv] samza git commit: SAMZA-786: improve reliability of host affinity tests

SAMZA-786: improve reliability of host affinity tests


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dfdc35e7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dfdc35e7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dfdc35e7

Branch: refs/heads/samza-sql
Commit: dfdc35e7e1484180932591b6d0e804fbcbfd7b40
Parents: 64f3f6b
Author: Jacob Maes <ja...@gmail.com>
Authored: Tue Oct 6 17:37:20 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Tue Oct 6 17:37:20 2015 -0700

----------------------------------------------------------------------
 .../samza/job/yarn/ContainerRequestState.java   |   8 +-
 .../samza/job/yarn/TestContainerAllocator.java  |  91 +++++++--------
 .../yarn/TestHostAwareContainerAllocator.java   | 111 ++++++++++---------
 .../job/yarn/util/MockContainerListener.java    |  80 +++++++++++++
 .../yarn/util/MockContainerRequestState.java    |  69 ++++++++++++
 5 files changed, 253 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
index b5e0368..4b36a91 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
@@ -200,8 +200,11 @@ public class ContainerRequestState {
   /**
    * If requestQueue is empty, all extra containers in the buffer should be released and update the entire system's state
    * Needs to be synchronized because it is modifying shared state buffers
+   * @return the number of containers released.
    */
-  public synchronized void releaseExtraContainers() {
+  public synchronized int releaseExtraContainers() {
+    int numReleasedContainers = 0;
+
     if (hostAffinityEnabled) {
       if (requestsQueue.isEmpty()) {
         log.info("Requests Queue is empty. Should clear up state.");
@@ -213,6 +216,7 @@ public class ContainerRequestState {
             for (Container c : containers) {
               log.info("Releasing extra container {} allocated on {}", c.getId(), host);
               amClient.releaseAssignedContainer(c.getId());
+              numReleasedContainers++;
             }
           }
         }
@@ -227,10 +231,12 @@ public class ContainerRequestState {
           Container c = availableContainers.remove(0);
           log.info("Releasing extra allocated container - {}", c.getId());
           amClient.releaseAssignedContainer(c.getId());
+          numReleasedContainers++;
         }
         clearState();
       }
     }
+    return numReleasedContainers;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
index 0d07e28..01f32a4 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
@@ -19,6 +19,11 @@
 
 package org.apache.samza.job.yarn;
 
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
@@ -31,7 +36,10 @@ import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.job.yarn.util.*;
+import org.apache.samza.job.yarn.util.MockContainerListener;
+import org.apache.samza.job.yarn.util.MockContainerRequestState;
+import org.apache.samza.job.yarn.util.MockHttpServer;
+import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
 import org.apache.samza.job.yarn.util.TestUtil;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -39,13 +47,10 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestContainerAllocator {
   private final int ALLOCATOR_SLEEP_TIME = 10;
@@ -54,6 +59,7 @@ public class TestContainerAllocator {
 
   private AMRMClientAsyncImpl amRmClientAsync;
   private TestAMRMClientImpl testAMRMClient;
+  private MockContainerRequestState requestState;
   private ContainerAllocator containerAllocator;
   private Thread allocatorThread;
 
@@ -97,11 +103,16 @@ public class TestContainerAllocator {
     // Initialize certain state variables (mostly to avoid NPE)
     state.coordinatorUrl = new URL("http://localhost:7778/");
 
+    requestState = new MockContainerRequestState(amRmClientAsync, false);
     containerAllocator = new ContainerAllocator(
         amRmClientAsync,
         TestUtil.getContainerUtil(config, state),
         ALLOCATOR_SLEEP_TIME
     );
+    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
+    requestStateField.setAccessible(true);
+    requestStateField.set(containerAllocator, requestState);
+
     allocatorThread = new Thread(containerAllocator);
   }
 
@@ -116,21 +127,6 @@ public class TestContainerAllocator {
    */
   @Test
   public void testAddContainer() throws Exception {
-
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(new HashMap<Integer, String>() {
-      {
-        put(0, ANY_HOST);
-        put(1, ANY_HOST);
-      }
-    });
-
-    ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
     assertNull(requestState.getContainersOnAHost("abc"));
     assertNull(requestState.getContainersOnAHost(ANY_HOST));
 
@@ -160,10 +156,6 @@ public class TestContainerAllocator {
 
     containerAllocator.requestContainers(containersToHostMapping);
 
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
     assertNotNull(testAMRMClient.requests);
     assertEquals(4, testAMRMClient.requests.size());
 
@@ -191,10 +183,6 @@ public class TestContainerAllocator {
 
     containerAllocator.requestContainers(containersToHostMapping);
 
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
     assertNotNull(requestState);
 
     assertNotNull(requestState.getRequestsQueue());
@@ -212,9 +200,27 @@ public class TestContainerAllocator {
    */
   @Test
   public void testAllocatorReleasesExtraContainers() throws Exception {
-    Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
-    Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
-    Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
+    final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
+    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
+    final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
+
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(3, 2, null, new Runnable() {
+      @Override
+      public void run() {
+        assertNotNull(testAMRMClient.getRelease());
+        assertEquals(2, testAMRMClient.getRelease().size());
+        assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
+        assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
+
+        // Test that state is cleaned up
+        assertEquals(0, requestState.getRequestsQueue().size());
+        assertEquals(0, requestState.getRequestsToCountMap().size());
+        assertNull(requestState.getContainersOnAHost("abc"));
+        assertNull(requestState.getContainersOnAHost("def"));
+      }
+    });
+    requestState.registerContainerListener(listener);
 
     allocatorThread.start();
 
@@ -224,22 +230,7 @@ public class TestContainerAllocator {
     containerAllocator.addContainer(container1);
     containerAllocator.addContainer(container2);
 
-    Thread.sleep(600);
-
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
-    assertNotNull(testAMRMClient.getRelease());
-    assertEquals(2, testAMRMClient.getRelease().size());
-    assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
-    assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
-    // Test that state is cleaned up
-    assertEquals(0, requestState.getRequestsQueue().size());
-    assertEquals(0, requestState.getRequestsToCountMap().size());
-    assertNull(requestState.getContainersOnAHost("abc"));
-    assertNull(requestState.getContainersOnAHost("def"));
+    listener.verify();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
index ee4a5b0..663ea25 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
@@ -18,6 +18,12 @@
  */
 package org.apache.samza.job.yarn;
 
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
@@ -30,6 +36,8 @@ import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.job.yarn.util.MockContainerListener;
+import org.apache.samza.job.yarn.util.MockContainerRequestState;
 import org.apache.samza.job.yarn.util.MockContainerUtil;
 import org.apache.samza.job.yarn.util.MockHttpServer;
 import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
@@ -40,16 +48,10 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestHostAwareContainerAllocator {
   private static final int ALLOCATOR_SLEEP_TIME = 1;
@@ -60,6 +62,7 @@ public class TestHostAwareContainerAllocator {
 
   private AMRMClientAsyncImpl amRmClientAsync;
   private TestAMRMClientImpl testAMRMClient;
+  private MockContainerRequestState requestState;
   private HostAwareContainerAllocator containerAllocator;
   private Thread allocatorThread;
   private ContainerUtil containerUtil;
@@ -114,12 +117,17 @@ public class TestHostAwareContainerAllocator {
 
     containerUtil = TestUtil.getContainerUtil(getConfig(), state);
 
+    requestState = new MockContainerRequestState(amRmClientAsync, true);
     containerAllocator = new HostAwareContainerAllocator(
         amRmClientAsync,
         containerUtil,
         ALLOCATOR_SLEEP_TIME,
         CONTAINER_REQUEST_TIMEOUT
     );
+    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
+    requestStateField.setAccessible(true);
+    requestStateField.set(containerAllocator, requestState);
+
     allocatorThread = new Thread(containerAllocator);
   }
 
@@ -131,9 +139,27 @@ public class TestHostAwareContainerAllocator {
 
   @Test
   public void testAllocatorReleasesExtraContainers() throws Exception {
-    Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
-    Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
-    Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
+    final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
+    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
+    final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
+
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(3, 2, null, new Runnable() {
+      @Override
+      public void run() {
+        assertNotNull(testAMRMClient.getRelease());
+        assertEquals(2, testAMRMClient.getRelease().size());
+        assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
+        assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
+
+        // Test that state is cleaned up
+        assertEquals(0, requestState.getRequestsQueue().size());
+        assertEquals(0, requestState.getRequestsToCountMap().size());
+        assertNull(requestState.getContainersOnAHost("abc"));
+        assertNull(requestState.getContainersOnAHost("def"));
+      }
+    });
+    requestState.registerContainerListener(listener);
 
     allocatorThread.start();
 
@@ -143,22 +169,7 @@ public class TestHostAwareContainerAllocator {
     containerAllocator.addContainer(container1);
     containerAllocator.addContainer(container2);
 
-    Thread.sleep(600);
-
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
-    assertNotNull(testAMRMClient.getRelease());
-    assertEquals(2, testAMRMClient.getRelease().size());
-    assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
-    assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
-    // Test that state is cleaned up
-    assertEquals(0, requestState.getRequestsQueue().size());
-    assertEquals(0, requestState.getRequestsToCountMap().size());
-    assertNull(requestState.getContainersOnAHost("abc"));
-    assertNull(requestState.getContainersOnAHost("def"));
+    listener.verify();
   }
 
   /**
@@ -176,10 +187,6 @@ public class TestHostAwareContainerAllocator {
 
     containerAllocator.requestContainers(containersToHostMapping);
 
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
     assertNotNull(requestState);
 
     assertNotNull(requestState.getRequestsQueue());
@@ -195,10 +202,6 @@ public class TestHostAwareContainerAllocator {
    */
   @Test
   public void testAddContainerWithHostAffinity() throws Exception {
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
     containerAllocator.requestContainers(new HashMap<Integer, String>() {
       {
         put(0, "abc");
@@ -206,8 +209,6 @@ public class TestHostAwareContainerAllocator {
       }
     });
 
-    allocatorThread.start();
-
     assertNotNull(requestState.getContainersOnAHost("abc"));
     assertEquals(0, requestState.getContainersOnAHost("abc").size());
 
@@ -221,16 +222,18 @@ public class TestHostAwareContainerAllocator {
     containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "xyz", 123));
 
     assertNotNull(requestState.getContainersOnAHost("abc"));
-    assertTrue(requestState.getContainersOnAHost("abc").size() == 1);
+    assertEquals(1, requestState.getContainersOnAHost("abc").size());
 
     assertNotNull(requestState.getContainersOnAHost("xyz"));
-    assertTrue(requestState.getContainersOnAHost("xyz").size() == 1);
+    assertEquals(1, requestState.getContainersOnAHost("xyz").size());
 
     assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
     assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 1);
-    assertEquals(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), requestState.getContainersOnAHost(ANY_HOST).get(0).getId());
+    assertEquals(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"),
+        requestState.getContainersOnAHost(ANY_HOST).get(0).getId());
   }
 
+
   @Test
   public void testRequestContainers() throws Exception {
     Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
@@ -245,11 +248,6 @@ public class TestHostAwareContainerAllocator {
 
     containerAllocator.requestContainers(containersToHostMapping);
 
-
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
     assertNotNull(testAMRMClient.requests);
     assertEquals(4, testAMRMClient.requests.size());
 
@@ -284,10 +282,6 @@ public class TestHostAwareContainerAllocator {
     };
     containerAllocator.requestContainers(containersToHostMapping);
 
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
     assertNotNull(requestState.getRequestsQueue());
     assertTrue(requestState.getRequestsQueue().size() == 2);
 
@@ -298,19 +292,26 @@ public class TestHostAwareContainerAllocator {
     assertNotNull(requestState.getRequestsToCountMap().get("def"));
     assertTrue(requestState.getRequestsToCountMap().get("def").get() == 1);
 
-    allocatorThread.start();
+    // Set up our final asserts before starting the allocator thread
+    MockContainerListener listener = new MockContainerListener(2, 0, new Runnable() {
+      @Override
+      public void run() {
+        assertNull(requestState.getContainersOnAHost("xyz"));
+        assertNull(requestState.getContainersOnAHost("zzz"));
+        assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
+        assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
+      }
+    }, null);
+    requestState.registerContainerListener(listener);
 
-    Thread.sleep(600);
+    allocatorThread.start();
 
     Container container0 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "xyz", 123);
     Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "zzz", 123);
     containerAllocator.addContainer(container0);
     containerAllocator.addContainer(container1);
 
-    assertNull(requestState.getContainersOnAHost("xyz"));
-    assertNull(requestState.getContainersOnAHost("zzz"));
-    assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
-    assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
+    listener.verify();
 
     Thread.sleep(1000);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
new file mode 100644
index 0000000..8fc0b98
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn.util;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import static org.junit.Assert.assertTrue;
+
+public class MockContainerListener {
+  private static final int NUM_CONDITIONS = 2;
+  private boolean allContainersAdded = false;
+  private boolean allContainersReleased = false;
+  private final int numExpectedContainersAdded;
+  private final int numExpectedContainersReleased;
+  private final Runnable addContainerAssertions;
+  private final Runnable releaseContainerAssertions;
+
+  public MockContainerListener(int numExpectedContainersAdded,
+      int numExpectedContainersReleased,
+      Runnable addContainerAssertions,
+      Runnable releaseContainerAssertions) {
+    this.numExpectedContainersAdded = numExpectedContainersAdded;
+    this.numExpectedContainersReleased = numExpectedContainersReleased;
+    this.addContainerAssertions = addContainerAssertions;
+    this.releaseContainerAssertions = releaseContainerAssertions;
+  }
+
+  public synchronized void postAddContainer(Container container, int totalAddedContainers) {
+    if (totalAddedContainers == numExpectedContainersAdded) {
+      if (addContainerAssertions != null) {
+        addContainerAssertions.run();
+      }
+
+      allContainersAdded = true;
+      this.notifyAll();
+    }
+  }
+
+  public synchronized void postReleaseContainers(int totalReleasedContainers) {
+    if (totalReleasedContainers == numExpectedContainersReleased) {
+      if (releaseContainerAssertions != null) {
+        releaseContainerAssertions.run();
+      }
+
+      allContainersReleased = true;
+      this.notifyAll();
+    }
+  }
+
+  public synchronized void verify() {
+    // There could be 1 notifyAll() for each condition, so we must wait up to that many times
+    for (int i = 0; i < NUM_CONDITIONS && !(allContainersAdded && allContainersReleased); i++) {
+      try {
+        this.wait(5000);
+      } catch (InterruptedException e) {
+        // Do nothing
+      }
+    }
+
+    assertTrue("Not all containers were added.", allContainersAdded);
+    assertTrue("Not all containers were released.", allContainersReleased);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
new file mode 100644
index 0000000..e7441e5
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.samza.job.yarn.ContainerRequestState;
+
+
+public class MockContainerRequestState extends ContainerRequestState {
+  private final List<MockContainerListener> _mockContainerListeners = new ArrayList<MockContainerListener>();
+  private int numAddedContainers = 0;
+  private int numReleasedContainers = 0;
+
+  public MockContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
+      boolean hostAffinityEnabled) {
+    super(amClient, hostAffinityEnabled);
+  }
+
+
+  @Override
+  public synchronized void addContainer(Container container) {
+    super.addContainer(container);
+
+    numAddedContainers++;
+    for (MockContainerListener listener : _mockContainerListeners) {
+      listener.postAddContainer(container, numAddedContainers);
+    }
+  }
+
+  @Override
+  public synchronized int releaseExtraContainers() {
+    numReleasedContainers += super.releaseExtraContainers();
+
+    for (MockContainerListener listener : _mockContainerListeners) {
+      listener.postReleaseContainers(numReleasedContainers);
+    }
+
+    return numAddedContainers;
+  }
+
+  public void registerContainerListener(MockContainerListener listener) {
+    _mockContainerListeners.add(listener);
+  }
+
+  public void clearContainerListeners() {
+    _mockContainerListeners.clear();
+  }
+
+}