You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:20 UTC
[04/50] [abbrv] samza git commit: SAMZA-786: improve reliability of
host affinity tests
SAMZA-786: improve reliability of host affinity tests
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dfdc35e7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dfdc35e7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dfdc35e7
Branch: refs/heads/samza-sql
Commit: dfdc35e7e1484180932591b6d0e804fbcbfd7b40
Parents: 64f3f6b
Author: Jacob Maes <ja...@gmail.com>
Authored: Tue Oct 6 17:37:20 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Tue Oct 6 17:37:20 2015 -0700
----------------------------------------------------------------------
.../samza/job/yarn/ContainerRequestState.java | 8 +-
.../samza/job/yarn/TestContainerAllocator.java | 91 +++++++--------
.../yarn/TestHostAwareContainerAllocator.java | 111 ++++++++++---------
.../job/yarn/util/MockContainerListener.java | 80 +++++++++++++
.../yarn/util/MockContainerRequestState.java | 69 ++++++++++++
5 files changed, 253 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
index b5e0368..4b36a91 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
@@ -200,8 +200,11 @@ public class ContainerRequestState {
/**
* If requestQueue is empty, all extra containers in the buffer should be released and update the entire system's state
* Needs to be synchronized because it is modifying shared state buffers
+ * @return the number of containers released.
*/
- public synchronized void releaseExtraContainers() {
+ public synchronized int releaseExtraContainers() {
+ int numReleasedContainers = 0;
+
if (hostAffinityEnabled) {
if (requestsQueue.isEmpty()) {
log.info("Requests Queue is empty. Should clear up state.");
@@ -213,6 +216,7 @@ public class ContainerRequestState {
for (Container c : containers) {
log.info("Releasing extra container {} allocated on {}", c.getId(), host);
amClient.releaseAssignedContainer(c.getId());
+ numReleasedContainers++;
}
}
}
@@ -227,10 +231,12 @@ public class ContainerRequestState {
Container c = availableContainers.remove(0);
log.info("Releasing extra allocated container - {}", c.getId());
amClient.releaseAssignedContainer(c.getId());
+ numReleasedContainers++;
}
clearState();
}
}
+ return numReleasedContainers;
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
index 0d07e28..01f32a4 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
@@ -19,6 +19,11 @@
package org.apache.samza.job.yarn;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
@@ -31,7 +36,10 @@ import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.job.yarn.util.*;
+import org.apache.samza.job.yarn.util.MockContainerListener;
+import org.apache.samza.job.yarn.util.MockContainerRequestState;
+import org.apache.samza.job.yarn.util.MockHttpServer;
+import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
import org.apache.samza.job.yarn.util.TestUtil;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -39,13 +47,10 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class TestContainerAllocator {
private final int ALLOCATOR_SLEEP_TIME = 10;
@@ -54,6 +59,7 @@ public class TestContainerAllocator {
private AMRMClientAsyncImpl amRmClientAsync;
private TestAMRMClientImpl testAMRMClient;
+ private MockContainerRequestState requestState;
private ContainerAllocator containerAllocator;
private Thread allocatorThread;
@@ -97,11 +103,16 @@ public class TestContainerAllocator {
// Initialize certain state variables (mostly to avoid NPE)
state.coordinatorUrl = new URL("http://localhost:7778/");
+ requestState = new MockContainerRequestState(amRmClientAsync, false);
containerAllocator = new ContainerAllocator(
amRmClientAsync,
TestUtil.getContainerUtil(config, state),
ALLOCATOR_SLEEP_TIME
);
+ Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
+ requestStateField.setAccessible(true);
+ requestStateField.set(containerAllocator, requestState);
+
allocatorThread = new Thread(containerAllocator);
}
@@ -116,21 +127,6 @@ public class TestContainerAllocator {
*/
@Test
public void testAddContainer() throws Exception {
-
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
-
- allocatorThread.start();
-
- containerAllocator.requestContainers(new HashMap<Integer, String>() {
- {
- put(0, ANY_HOST);
- put(1, ANY_HOST);
- }
- });
-
- ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
assertNull(requestState.getContainersOnAHost("abc"));
assertNull(requestState.getContainersOnAHost(ANY_HOST));
@@ -160,10 +156,6 @@ public class TestContainerAllocator {
containerAllocator.requestContainers(containersToHostMapping);
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
assertNotNull(testAMRMClient.requests);
assertEquals(4, testAMRMClient.requests.size());
@@ -191,10 +183,6 @@ public class TestContainerAllocator {
containerAllocator.requestContainers(containersToHostMapping);
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
assertNotNull(requestState);
assertNotNull(requestState.getRequestsQueue());
@@ -212,9 +200,27 @@ public class TestContainerAllocator {
*/
@Test
public void testAllocatorReleasesExtraContainers() throws Exception {
- Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
- Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
- Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
+ final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
+ final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
+ final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
+
+ // Set up our final asserts before starting the allocator thread
+ MockContainerListener listener = new MockContainerListener(3, 2, null, new Runnable() {
+ @Override
+ public void run() {
+ assertNotNull(testAMRMClient.getRelease());
+ assertEquals(2, testAMRMClient.getRelease().size());
+ assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
+ assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
+
+ // Test that state is cleaned up
+ assertEquals(0, requestState.getRequestsQueue().size());
+ assertEquals(0, requestState.getRequestsToCountMap().size());
+ assertNull(requestState.getContainersOnAHost("abc"));
+ assertNull(requestState.getContainersOnAHost("def"));
+ }
+ });
+ requestState.registerContainerListener(listener);
allocatorThread.start();
@@ -224,22 +230,7 @@ public class TestContainerAllocator {
containerAllocator.addContainer(container1);
containerAllocator.addContainer(container2);
- Thread.sleep(600);
-
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
- assertNotNull(testAMRMClient.getRelease());
- assertEquals(2, testAMRMClient.getRelease().size());
- assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
- assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
- // Test that state is cleaned up
- assertEquals(0, requestState.getRequestsQueue().size());
- assertEquals(0, requestState.getRequestsToCountMap().size());
- assertNull(requestState.getContainersOnAHost("abc"));
- assertNull(requestState.getContainersOnAHost("def"));
+ listener.verify();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
index ee4a5b0..663ea25 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
@@ -18,6 +18,12 @@
*/
package org.apache.samza.job.yarn;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
@@ -30,6 +36,8 @@ import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.job.yarn.util.MockContainerListener;
+import org.apache.samza.job.yarn.util.MockContainerRequestState;
import org.apache.samza.job.yarn.util.MockContainerUtil;
import org.apache.samza.job.yarn.util.MockHttpServer;
import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
@@ -40,16 +48,10 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class TestHostAwareContainerAllocator {
private static final int ALLOCATOR_SLEEP_TIME = 1;
@@ -60,6 +62,7 @@ public class TestHostAwareContainerAllocator {
private AMRMClientAsyncImpl amRmClientAsync;
private TestAMRMClientImpl testAMRMClient;
+ private MockContainerRequestState requestState;
private HostAwareContainerAllocator containerAllocator;
private Thread allocatorThread;
private ContainerUtil containerUtil;
@@ -114,12 +117,17 @@ public class TestHostAwareContainerAllocator {
containerUtil = TestUtil.getContainerUtil(getConfig(), state);
+ requestState = new MockContainerRequestState(amRmClientAsync, true);
containerAllocator = new HostAwareContainerAllocator(
amRmClientAsync,
containerUtil,
ALLOCATOR_SLEEP_TIME,
CONTAINER_REQUEST_TIMEOUT
);
+ Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
+ requestStateField.setAccessible(true);
+ requestStateField.set(containerAllocator, requestState);
+
allocatorThread = new Thread(containerAllocator);
}
@@ -131,9 +139,27 @@ public class TestHostAwareContainerAllocator {
@Test
public void testAllocatorReleasesExtraContainers() throws Exception {
- Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
- Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
- Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
+ final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
+ final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
+ final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
+
+ // Set up our final asserts before starting the allocator thread
+ MockContainerListener listener = new MockContainerListener(3, 2, null, new Runnable() {
+ @Override
+ public void run() {
+ assertNotNull(testAMRMClient.getRelease());
+ assertEquals(2, testAMRMClient.getRelease().size());
+ assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
+ assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
+
+ // Test that state is cleaned up
+ assertEquals(0, requestState.getRequestsQueue().size());
+ assertEquals(0, requestState.getRequestsToCountMap().size());
+ assertNull(requestState.getContainersOnAHost("abc"));
+ assertNull(requestState.getContainersOnAHost("def"));
+ }
+ });
+ requestState.registerContainerListener(listener);
allocatorThread.start();
@@ -143,22 +169,7 @@ public class TestHostAwareContainerAllocator {
containerAllocator.addContainer(container1);
containerAllocator.addContainer(container2);
- Thread.sleep(600);
-
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
- assertNotNull(testAMRMClient.getRelease());
- assertEquals(2, testAMRMClient.getRelease().size());
- assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
- assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
- // Test that state is cleaned up
- assertEquals(0, requestState.getRequestsQueue().size());
- assertEquals(0, requestState.getRequestsToCountMap().size());
- assertNull(requestState.getContainersOnAHost("abc"));
- assertNull(requestState.getContainersOnAHost("def"));
+ listener.verify();
}
/**
@@ -176,10 +187,6 @@ public class TestHostAwareContainerAllocator {
containerAllocator.requestContainers(containersToHostMapping);
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
assertNotNull(requestState);
assertNotNull(requestState.getRequestsQueue());
@@ -195,10 +202,6 @@ public class TestHostAwareContainerAllocator {
*/
@Test
public void testAddContainerWithHostAffinity() throws Exception {
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
containerAllocator.requestContainers(new HashMap<Integer, String>() {
{
put(0, "abc");
@@ -206,8 +209,6 @@ public class TestHostAwareContainerAllocator {
}
});
- allocatorThread.start();
-
assertNotNull(requestState.getContainersOnAHost("abc"));
assertEquals(0, requestState.getContainersOnAHost("abc").size());
@@ -221,16 +222,18 @@ public class TestHostAwareContainerAllocator {
containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "xyz", 123));
assertNotNull(requestState.getContainersOnAHost("abc"));
- assertTrue(requestState.getContainersOnAHost("abc").size() == 1);
+ assertEquals(1, requestState.getContainersOnAHost("abc").size());
assertNotNull(requestState.getContainersOnAHost("xyz"));
- assertTrue(requestState.getContainersOnAHost("xyz").size() == 1);
+ assertEquals(1, requestState.getContainersOnAHost("xyz").size());
assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 1);
- assertEquals(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), requestState.getContainersOnAHost(ANY_HOST).get(0).getId());
+ assertEquals(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"),
+ requestState.getContainersOnAHost(ANY_HOST).get(0).getId());
}
+
@Test
public void testRequestContainers() throws Exception {
Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
@@ -245,11 +248,6 @@ public class TestHostAwareContainerAllocator {
containerAllocator.requestContainers(containersToHostMapping);
-
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
assertNotNull(testAMRMClient.requests);
assertEquals(4, testAMRMClient.requests.size());
@@ -284,10 +282,6 @@ public class TestHostAwareContainerAllocator {
};
containerAllocator.requestContainers(containersToHostMapping);
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator);
-
assertNotNull(requestState.getRequestsQueue());
assertTrue(requestState.getRequestsQueue().size() == 2);
@@ -298,19 +292,26 @@ public class TestHostAwareContainerAllocator {
assertNotNull(requestState.getRequestsToCountMap().get("def"));
assertTrue(requestState.getRequestsToCountMap().get("def").get() == 1);
- allocatorThread.start();
+ // Set up our final asserts before starting the allocator thread
+ MockContainerListener listener = new MockContainerListener(2, 0, new Runnable() {
+ @Override
+ public void run() {
+ assertNull(requestState.getContainersOnAHost("xyz"));
+ assertNull(requestState.getContainersOnAHost("zzz"));
+ assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
+ assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
+ }
+ }, null);
+ requestState.registerContainerListener(listener);
- Thread.sleep(600);
+ allocatorThread.start();
Container container0 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "xyz", 123);
Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "zzz", 123);
containerAllocator.addContainer(container0);
containerAllocator.addContainer(container1);
- assertNull(requestState.getContainersOnAHost("xyz"));
- assertNull(requestState.getContainersOnAHost("zzz"));
- assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
- assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
+ listener.verify();
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
new file mode 100644
index 0000000..8fc0b98
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn.util;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import static org.junit.Assert.assertTrue;
+
+public class MockContainerListener {
+ private static final int NUM_CONDITIONS = 2;
+ private boolean allContainersAdded = false;
+ private boolean allContainersReleased = false;
+ private final int numExpectedContainersAdded;
+ private final int numExpectedContainersReleased;
+ private final Runnable addContainerAssertions;
+ private final Runnable releaseContainerAssertions;
+
+ public MockContainerListener(int numExpectedContainersAdded,
+ int numExpectedContainersReleased,
+ Runnable addContainerAssertions,
+ Runnable releaseContainerAssertions) {
+ this.numExpectedContainersAdded = numExpectedContainersAdded;
+ this.numExpectedContainersReleased = numExpectedContainersReleased;
+ this.addContainerAssertions = addContainerAssertions;
+ this.releaseContainerAssertions = releaseContainerAssertions;
+ }
+
+ public synchronized void postAddContainer(Container container, int totalAddedContainers) {
+ if (totalAddedContainers == numExpectedContainersAdded) {
+ if (addContainerAssertions != null) {
+ addContainerAssertions.run();
+ }
+
+ allContainersAdded = true;
+ this.notifyAll();
+ }
+ }
+
+ public synchronized void postReleaseContainers(int totalReleasedContainers) {
+ if (totalReleasedContainers == numExpectedContainersReleased) {
+ if (releaseContainerAssertions != null) {
+ releaseContainerAssertions.run();
+ }
+
+ allContainersReleased = true;
+ this.notifyAll();
+ }
+ }
+
+ public synchronized void verify() {
+ // There could be 1 notifyAll() for each condition, so we must wait up to that many times
+ for (int i = 0; i < NUM_CONDITIONS && !(allContainersAdded && allContainersReleased); i++) {
+ try {
+ this.wait(5000);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+
+ assertTrue("Not all containers were added.", allContainersAdded);
+ assertTrue("Not all containers were released.", allContainersReleased);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
new file mode 100644
index 0000000..e7441e5
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.samza.job.yarn.ContainerRequestState;
+
+
+public class MockContainerRequestState extends ContainerRequestState {
+ private final List<MockContainerListener> _mockContainerListeners = new ArrayList<MockContainerListener>();
+ private int numAddedContainers = 0;
+ private int numReleasedContainers = 0;
+
+ public MockContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
+ boolean hostAffinityEnabled) {
+ super(amClient, hostAffinityEnabled);
+ }
+
+
+ @Override
+ public synchronized void addContainer(Container container) {
+ super.addContainer(container);
+
+ numAddedContainers++;
+ for (MockContainerListener listener : _mockContainerListeners) {
+ listener.postAddContainer(container, numAddedContainers);
+ }
+ }
+
+ @Override
+ public synchronized int releaseExtraContainers() {
+ numReleasedContainers += super.releaseExtraContainers();
+
+ for (MockContainerListener listener : _mockContainerListeners) {
+ listener.postReleaseContainers(numReleasedContainers);
+ }
+
+ return numAddedContainers;
+ }
+
+ public void registerContainerListener(MockContainerListener listener) {
+ _mockContainerListeners.add(listener);
+ }
+
+ public void clearContainerListeners() {
+ _mockContainerListeners.clear();
+ }
+
+}