You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/04/16 00:19:36 UTC
samza git commit: SAMZA-910 - Host Affinity - Fix sporadic failures
in Container Allocator tests
Repository: samza
Updated Branches:
refs/heads/master ed5be4f92 -> 1497bf6c2
SAMZA-910 - Host Affinity - Fix sporadic failures in Container Allocator tests
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1497bf6c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1497bf6c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1497bf6c
Branch: refs/heads/master
Commit: 1497bf6c2629dd165f02352480584d3b3a9e5c18
Parents: ed5be4f
Author: Jacob Maes <ja...@gmail.com>
Authored: Fri Apr 15 14:21:16 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri Apr 15 14:21:16 2016 -0700
----------------------------------------------------------------------
.../samza/job/yarn/TestContainerAllocator.java | 235 +++-----------
.../job/yarn/TestContainerAllocatorCommon.java | 225 ++++++++++++++
.../yarn/TestHostAwareContainerAllocator.java | 306 +++++--------------
.../job/yarn/util/MockContainerListener.java | 146 ++++++---
.../yarn/util/MockContainerRequestState.java | 19 +-
.../samza/job/yarn/util/MockContainerUtil.java | 12 +
6 files changed, 464 insertions(+), 479 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
index b253f98..e21aded 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
@@ -19,35 +19,15 @@
package org.apache.samza.job.yarn;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobCoordinator;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.job.yarn.util.MockContainerListener;
import org.apache.samza.job.yarn.util.MockContainerRequestState;
-import org.apache.samza.job.yarn.util.MockContainerUtil;
-import org.apache.samza.job.yarn.util.MockHttpServer;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
import org.apache.samza.job.yarn.util.TestUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -55,18 +35,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-public class TestContainerAllocator {
- private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
- private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
+public class TestContainerAllocator extends TestContainerAllocatorCommon {
- private AMRMClientAsyncImpl amRmClientAsync;
- private TestAMRMClientImpl testAMRMClient;
- private MockContainerRequestState requestState;
- private ContainerAllocator containerAllocator;
- private ContainerUtil containerUtil;
- private Thread allocatorThread;
-
- private Config config = new MapConfig(new HashMap<String, String>() {
+ private final Config config = new MapConfig(new HashMap<String, String>() {
{
put("yarn.container.count", "1");
put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
@@ -77,68 +48,57 @@ public class TestContainerAllocator {
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
put("yarn.container.retry.count", "1");
put("yarn.container.retry.window.ms", "1999999999");
- put("yarn.allocator.sleep.ms", "10");
+ put("yarn.container.request.timeout.ms", "3");
+ put("yarn.allocator.sleep.ms", "1");
}
});
- private SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2);
+ @Override
+ protected Config getConfig() {
+ return config;
+ }
- private JobCoordinator getCoordinator(int containerCount) {
- Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
- for (int i = 0; i < containerCount; i++) {
- ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
- containers.put(i, container);
- }
- JobModel jobModel = new JobModel(config, containers);
- return new JobCoordinator(jobModel, server, null);
+ @Override
+ protected MockContainerRequestState createContainerRequestState(
+ AMRMClientAsync<AMRMClient.ContainerRequest> amClient) {
+ return new MockContainerRequestState(amClient, false);
}
- @Before
- public void setup() throws Exception {
- // Create AMRMClient
- testAMRMClient = new TestAMRMClientImpl(
- TestUtil.getAppMasterResponse(
- false,
- new ArrayList<Container>(),
- new ArrayList<ContainerStatus>()
- ));
- amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
+ /**
+ * Test request containers with no containerToHostMapping makes the right number of requests
+ */
+ @Test
+ public void testRequestContainersWithNoMapping() throws Exception {
+ int containerCount = 4;
+ Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
+ for (int i = 0; i < containerCount; i++) {
+ containersToHostMapping.put(i, null);
+ }
+ allocatorThread.start();
- // Initialize certain state variables (mostly to avoid NPE)
- state.coordinatorUrl = new URL("http://localhost:7778/");
+ containerAllocator.requestContainers(containersToHostMapping);
- requestState = new MockContainerRequestState(amRmClientAsync, false);
- containerUtil = TestUtil.getContainerUtil(config, state);
- containerAllocator = new ContainerAllocator(
- amRmClientAsync,
- containerUtil,
- new YarnConfig(config)
- );
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- requestStateField.set(containerAllocator, requestState);
+ assertNotNull(requestState);
- allocatorThread = new Thread(containerAllocator);
- }
+ assertNotNull(requestState.getRequestsQueue());
+ assertTrue(requestState.getRequestsQueue().size() == 4);
- @After
- public void teardown() throws Exception {
- containerAllocator.setIsRunning(false);
- allocatorThread.join();
+ // If host-affinty is not enabled, it doesn't update the requestMap
+ assertNotNull(requestState.getRequestsToCountMap());
+ assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
}
-
/**
* Adds all containers returned to ANY_HOST only
*/
@Test
public void testAddContainer() throws Exception {
- assertNull(requestState.getContainersOnAHost("abc"));
+ assertNull(requestState.getContainersOnAHost("host1"));
assertNull(requestState.getContainersOnAHost(ANY_HOST));
- containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123));
- containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "xyz", 123));
+ containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123));
+ containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123));
- assertNull(requestState.getContainersOnAHost("abc"));
+ assertNull(requestState.getContainersOnAHost("host1"));
assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
}
@@ -150,10 +110,10 @@ public class TestContainerAllocator {
public void testRequestContainers() throws Exception {
Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
{
- put(0, "abc");
- put(1, "def");
+ put(0, "host1");
+ put(1, "host2");
put(2, null);
- put(3, "abc");
+ put(3, "host1");
}
};
@@ -174,121 +134,4 @@ public class TestContainerAllocator {
assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
}
- /**
- * Test request containers with no containerToHostMapping makes the right number of requests
- */
- @Test
- public void testRequestContainersWithNoMapping() throws Exception {
- int containerCount = 4;
- Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
- for (int i = 0; i < containerCount; i++) {
- containersToHostMapping.put(i, null);
- }
- allocatorThread.start();
-
- containerAllocator.requestContainers(containersToHostMapping);
-
- assertNotNull(requestState);
-
- assertNotNull(requestState.getRequestsQueue());
- assertTrue(requestState.getRequestsQueue().size() == 4);
-
- // If host-affinty is not enabled, it doesn't update the requestMap
- assertNotNull(requestState.getRequestsToCountMap());
- assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
- }
-
- /**
- * If the container fails to start e.g because it fails to connect to a NM on a host that
- * is down, the allocator should request a new container on a different host.
- */
- @Test
- public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
- final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "2", 123);
- final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "1", 123);
-
- ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!");
-
- // Set up our final asserts before starting the allocator thread
- MockContainerListener listener = new MockContainerListener(2, 1, 2, null, new Runnable() {
- @Override
- public void run() {
- // The failed container should be released. The successful one should not.
- assertNotNull(testAMRMClient.getRelease());
- assertEquals(1, testAMRMClient.getRelease().size());
- assertTrue(testAMRMClient.getRelease().contains(container.getId()));
- }
- },
- new Runnable() {
- @Override
- public void run() {
- // Test that the first request assignment had a preferred host and the retry didn't
- assertEquals(2, requestState.assignedRequests.size());
-
- SamzaContainerRequest request = requestState.assignedRequests.remove();
- assertEquals(0, request.expectedContainerId);
- assertEquals("2", request.getPreferredHost());
-
- request = requestState.assignedRequests.remove();
- assertEquals(0, request.expectedContainerId);
- assertEquals("ANY_HOST", request.getPreferredHost());
-
- // This routine should be called after the retry is assigned, but before it's started.
- // So there should still be 1 container needed.
- assertEquals(1, state.neededContainers.get());
- }
- }
- );
- requestState.registerContainerListener(listener);
-
- allocatorThread.start();
-
- // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
- containerAllocator.requestContainer(0, "2");
- containerAllocator.addContainer(container);
- containerAllocator.addContainer(container1);
-
- listener.verify();
- }
-
- /**
- * Extra allocated containers that are returned by the RM and unused by the AM should be released.
- * Containers are considered "extra" only when there are no more pending requests to fulfill
- * @throws Exception
- */
- @Test
- public void testAllocatorReleasesExtraContainers() throws Exception {
- final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
- final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
- final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
-
- // Set up our final asserts before starting the allocator thread
- MockContainerListener listener = new MockContainerListener(3, 2, 0, null, new Runnable() {
- @Override
- public void run() {
- assertNotNull(testAMRMClient.getRelease());
- assertEquals(2, testAMRMClient.getRelease().size());
- assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
- assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
- // Test that state is cleaned up
- assertEquals(0, requestState.getRequestsQueue().size());
- assertEquals(0, requestState.getRequestsToCountMap().size());
- assertNull(requestState.getContainersOnAHost("abc"));
- assertNull(requestState.getContainersOnAHost("def"));
- }
- }, null);
- requestState.registerContainerListener(listener);
-
- allocatorThread.start();
-
- containerAllocator.requestContainer(0, "abc");
-
- containerAllocator.addContainer(container);
- containerAllocator.addContainer(container1);
- containerAllocator.addContainer(container2);
-
- listener.verify();
- }
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
new file mode 100644
index 0000000..5badd29
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.YarnConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.job.yarn.util.MockContainerListener;
+import org.apache.samza.job.yarn.util.MockContainerRequestState;
+import org.apache.samza.job.yarn.util.MockContainerUtil;
+import org.apache.samza.job.yarn.util.MockHttpServer;
+import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
+import org.apache.samza.job.yarn.util.TestUtil;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Handles all common fields/tests for ContainerAllocators.
+ */
+public abstract class TestContainerAllocatorCommon {
+ protected static final String ANY_HOST = ContainerRequestState.ANY_HOST;
+
+ protected final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
+
+ protected AMRMClientAsyncImpl amRmClientAsync;
+ protected TestAMRMClientImpl testAMRMClient;
+ protected MockContainerRequestState requestState;
+ protected AbstractContainerAllocator containerAllocator;
+ protected Thread allocatorThread;
+ protected ContainerUtil containerUtil;
+
+ protected SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2);
+
+ protected abstract Config getConfig();
+ protected abstract MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient);
+
+ private JobCoordinator getCoordinator(int containerCount) {
+ Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+ for (int i = 0; i < containerCount; i++) {
+ ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
+ containers.put(i, container);
+ }
+ JobModel jobModel = new JobModel(getConfig(), containers);
+ return new JobCoordinator(jobModel, server, null);
+ }
+
+
+ @Before
+ public void setup() throws Exception {
+ // Create AMRMClient
+ testAMRMClient = new TestAMRMClientImpl(
+ TestUtil.getAppMasterResponse(
+ false,
+ new ArrayList<Container>(),
+ new ArrayList<ContainerStatus>()
+ ));
+ amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
+
+ // Initialize certain state variables
+ state.coordinatorUrl = new URL("http://localhost:7778/");
+
+ containerUtil = TestUtil.getContainerUtil(getConfig(), state);
+
+ requestState = createContainerRequestState(amRmClientAsync);
+ containerAllocator = new HostAwareContainerAllocator(
+ amRmClientAsync,
+ containerUtil,
+ new YarnConfig(getConfig())
+ );
+ Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
+ requestStateField.setAccessible(true);
+ requestStateField.set(containerAllocator, requestState);
+
+ allocatorThread = new Thread(containerAllocator);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ containerAllocator.setIsRunning(false);
+ allocatorThread.join();
+ }
+
+ /**
+ * If the container fails to start e.g because it fails to connect to a NM on a host that
+ * is down, the allocator should request a new container on a different host.
+ */
+ @Test
+ public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
+ final Container container = TestUtil
+ .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host2", 123);
+ final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123);
+
+ ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!");
+
+ Runnable releasedContainerAssertions = new Runnable() {
+ @Override
+ public void run() {
+ // The failed container should be released. The successful one should not.
+ assertNotNull(testAMRMClient.getRelease());
+ assertEquals(1, testAMRMClient.getRelease().size());
+ assertTrue(testAMRMClient.getRelease().contains(container.getId()));
+ }
+ };
+
+ Runnable assignedContainerAssertions = new Runnable() {
+ @Override
+ public void run() {
+ // Test that the first request assignment had a preferred host and the retry didn't
+ assertEquals(2, requestState.assignedRequests.size());
+
+ SamzaContainerRequest request = requestState.assignedRequests.remove();
+ assertEquals(0, request.expectedContainerId);
+ assertEquals("host2", request.getPreferredHost());
+
+ request = requestState.assignedRequests.remove();
+ assertEquals(0, request.expectedContainerId);
+ assertEquals("ANY_HOST", request.getPreferredHost());
+
+ // This routine should be called after the retry is assigned, but before it's started.
+ // So there should still be 1 container needed because neededContainers should not be decremented for a failed start.
+ assertEquals(1, state.neededContainers.get());
+ }
+ };
+
+ // Set up our final asserts before starting the allocator thread
+ MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, releasedContainerAssertions, assignedContainerAssertions, null);
+ requestState.registerContainerListener(listener);
+ state.neededContainers.set(1); // Normally this would be done in the SamzaTaskManager
+
+ // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
+ containerAllocator.requestContainer(0, "host2");
+ containerAllocator.addContainer(container);
+ containerAllocator.addContainer(container1);
+
+ allocatorThread.start();
+
+ listener.verify();
+ }
+
+
+ /**
+ * Extra allocated containers that are returned by the RM and unused by the AM should be released.
+ * Containers are considered "extra" only when there are no more pending requests to fulfill
+ * @throws Exception
+ */
+ @Test
+ public void testAllocatorReleasesExtraContainers() throws Exception {
+ final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host1", 123);
+ final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123);
+ final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host2", 123);
+
+ Runnable releasedContainerAssertions = new Runnable() {
+ @Override
+ public void run() {
+ assertNotNull(testAMRMClient.getRelease());
+ assertEquals(2, testAMRMClient.getRelease().size());
+ assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
+ assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
+
+ // Test that state is cleaned up
+ assertEquals(0, requestState.getRequestsQueue().size());
+ assertEquals(0, requestState.getRequestsToCountMap().size());
+ assertNull(requestState.getContainersOnAHost("host1"));
+ assertNull(requestState.getContainersOnAHost("host2"));
+ }
+ };
+
+ // Set up our final asserts before starting the allocator thread
+ MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, null, releasedContainerAssertions, null, null);
+ requestState.registerContainerListener(listener);
+
+ containerAllocator.requestContainer(0, "host1");
+
+ containerAllocator.addContainer(container);
+ containerAllocator.addContainer(container1);
+ containerAllocator.addContainer(container2);
+
+ allocatorThread.start();
+
+ listener.verify();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
index 93e430b..ead7200 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
@@ -18,36 +18,20 @@
*/
package org.apache.samza.job.yarn;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobCoordinator;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
import org.apache.samza.job.yarn.util.MockContainerListener;
import org.apache.samza.job.yarn.util.MockContainerRequestState;
import org.apache.samza.job.yarn.util.MockContainerUtil;
-import org.apache.samza.job.yarn.util.MockHttpServer;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
import org.apache.samza.job.yarn.util.TestUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -55,19 +39,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-public class TestHostAwareContainerAllocator {
- private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
+public class TestHostAwareContainerAllocator extends TestContainerAllocatorCommon {
- private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
-
- private AMRMClientAsyncImpl amRmClientAsync;
- private TestAMRMClientImpl testAMRMClient;
- private MockContainerRequestState requestState;
- private HostAwareContainerAllocator containerAllocator;
- private Thread allocatorThread;
- private ContainerUtil containerUtil;
-
- private Config config = new MapConfig(new HashMap<String, String>() {
+ private final Config config = new MapConfig(new HashMap<String, String>() {
{
put("yarn.container.count", "1");
put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
@@ -84,147 +58,14 @@ public class TestHostAwareContainerAllocator {
}
});
- private Config getConfig() {
- Map<String, String> map = new HashMap<>();
- map.putAll(config);
- return new MapConfig(map);
- }
-
- private SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2);
-
- private JobCoordinator getCoordinator(int containerCount) {
- Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
- for (int i = 0; i < containerCount; i++) {
- ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
- containers.put(i, container);
- }
- JobModel jobModel = new JobModel(getConfig(), containers);
- return new JobCoordinator(jobModel, server, null);
- }
-
-
- @Before
- public void setup() throws Exception {
- // Create AMRMClient
- testAMRMClient = new TestAMRMClientImpl(
- TestUtil.getAppMasterResponse(
- false,
- new ArrayList<Container>(),
- new ArrayList<ContainerStatus>()
- ));
- amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
-
- // Initialize certain state variables
- state.coordinatorUrl = new URL("http://localhost:7778/");
-
- containerUtil = TestUtil.getContainerUtil(getConfig(), state);
-
- requestState = new MockContainerRequestState(amRmClientAsync, true);
- containerAllocator = new HostAwareContainerAllocator(
- amRmClientAsync,
- containerUtil,
- new YarnConfig(config)
- );
- Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
- requestStateField.setAccessible(true);
- requestStateField.set(containerAllocator, requestState);
-
- allocatorThread = new Thread(containerAllocator);
+ @Override
+ protected Config getConfig() {
+ return config;
}
- @After
- public void teardown() throws Exception {
- containerAllocator.setIsRunning(false);
- allocatorThread.join();
- }
-
- /**
- * If the container fails to start e.g because it fails to connect to a NM on a host that
- * is down, the allocator should request a new container on a different host.
- */
- @Test
- public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
- final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "2", 123);
- final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "1", 123);
-
- ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!");
-
- // Set up our final asserts before starting the allocator thread
- MockContainerListener listener = new MockContainerListener(2, 1, 2, null, new Runnable() {
- @Override
- public void run() {
- // The failed container should be released. The successful one should not.
- assertNotNull(testAMRMClient.getRelease());
- assertEquals(1, testAMRMClient.getRelease().size());
- assertTrue(testAMRMClient.getRelease().contains(container.getId()));
- }
- },
- new Runnable() {
- @Override
- public void run() {
- // Test that the first request assignment had a preferred host and the retry didn't
- assertEquals(2, requestState.assignedRequests.size());
-
- SamzaContainerRequest request = requestState.assignedRequests.remove();
- assertEquals(0, request.expectedContainerId);
- assertEquals("2", request.getPreferredHost());
-
- request = requestState.assignedRequests.remove();
- assertEquals(0, request.expectedContainerId);
- assertEquals("ANY_HOST", request.getPreferredHost());
-
- // This routine should be called after the retry is assigned, but before it's started.
- // So there should still be 1 container needed.
- assertEquals(1, state.neededContainers.get());
- }
- }
- );
- requestState.registerContainerListener(listener);
-
- // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
- containerAllocator.requestContainer(0, "2");
- containerAllocator.addContainer(container1);
- containerAllocator.addContainer(container);
-
- allocatorThread.start();
-
- listener.verify();
- }
-
- @Test
- public void testAllocatorReleasesExtraContainers() throws Exception {
- final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
- final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
- final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123);
-
- // Set up our final asserts before starting the allocator thread
- MockContainerListener listener = new MockContainerListener(3, 2, 0, null, new Runnable() {
- @Override
- public void run() {
- assertNotNull(testAMRMClient.getRelease());
- assertEquals(2, testAMRMClient.getRelease().size());
- assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
- assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
- // Test that state is cleaned up
- assertEquals(0, requestState.getRequestsQueue().size());
- assertEquals(0, requestState.getRequestsToCountMap().size());
- assertNull(requestState.getContainersOnAHost("abc"));
- assertNull(requestState.getContainersOnAHost("def"));
- }
- },
- null);
- requestState.registerContainerListener(listener);
-
- allocatorThread.start();
-
- containerAllocator.requestContainer(0, "abc");
-
- containerAllocator.addContainer(container);
- containerAllocator.addContainer(container1);
- containerAllocator.addContainer(container2);
-
- listener.verify();
+ @Override
+ protected MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient) {
+ return new MockContainerRequestState(amClient, true);
}
/**
@@ -259,28 +100,28 @@ public class TestHostAwareContainerAllocator {
public void testAddContainerWithHostAffinity() throws Exception {
containerAllocator.requestContainers(new HashMap<Integer, String>() {
{
- put(0, "abc");
- put(1, "xyz");
+ put(0, "host1");
+ put(1, "host3");
}
});
- assertNotNull(requestState.getContainersOnAHost("abc"));
- assertEquals(0, requestState.getContainersOnAHost("abc").size());
+ assertNotNull(requestState.getContainersOnAHost("host1"));
+ assertEquals(0, requestState.getContainersOnAHost("host1").size());
- assertNotNull(requestState.getContainersOnAHost("xyz"));
- assertEquals(0, requestState.getContainersOnAHost("xyz").size());
+ assertNotNull(requestState.getContainersOnAHost("host3"));
+ assertEquals(0, requestState.getContainersOnAHost("host3").size());
assertNull(requestState.getContainersOnAHost(ANY_HOST));
- containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123));
- containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "def", 123));
- containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "xyz", 123));
+ containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123));
+ containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "host2", 123));
+ containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123));
- assertNotNull(requestState.getContainersOnAHost("abc"));
- assertEquals(1, requestState.getContainersOnAHost("abc").size());
+ assertNotNull(requestState.getContainersOnAHost("host1"));
+ assertEquals(1, requestState.getContainersOnAHost("host1").size());
- assertNotNull(requestState.getContainersOnAHost("xyz"));
- assertEquals(1, requestState.getContainersOnAHost("xyz").size());
+ assertNotNull(requestState.getContainersOnAHost("host3"));
+ assertEquals(1, requestState.getContainersOnAHost("host3").size());
assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 1);
@@ -293,10 +134,10 @@ public class TestHostAwareContainerAllocator {
public void testRequestContainers() throws Exception {
Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
{
- put(0, "abc");
- put(1, "def");
+ put(0, "host1");
+ put(1, "host2");
put(2, null);
- put(3, "abc");
+ put(3, "host1");
}
};
allocatorThread.start();
@@ -314,11 +155,11 @@ public class TestHostAwareContainerAllocator {
assertNotNull(requestState.getRequestsToCountMap());
Map<String, AtomicInteger> requestsMap = requestState.getRequestsToCountMap();
- assertNotNull(requestsMap.get("abc"));
- assertEquals(2, requestsMap.get("abc").get());
+ assertNotNull(requestsMap.get("host1"));
+ assertEquals(2, requestsMap.get("host1").get());
- assertNotNull(requestsMap.get("def"));
- assertEquals(1, requestsMap.get("def").get());
+ assertNotNull(requestsMap.get("host2"));
+ assertEquals(1, requestsMap.get("host2").get());
assertNotNull(requestsMap.get(ANY_HOST));
assertEquals(1, requestsMap.get(ANY_HOST).get());
@@ -331,8 +172,8 @@ public class TestHostAwareContainerAllocator {
public void testExpiredRequestHandling() throws Exception {
Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
{
- put(0, "abc");
- put(1, "def");
+ put(0, "requestedHost1");
+ put(1, "requestedHost2");
}
};
containerAllocator.requestContainers(containersToHostMapping);
@@ -341,51 +182,72 @@ public class TestHostAwareContainerAllocator {
assertTrue(requestState.getRequestsQueue().size() == 2);
assertNotNull(requestState.getRequestsToCountMap());
- assertNotNull(requestState.getRequestsToCountMap().get("abc"));
- assertTrue(requestState.getRequestsToCountMap().get("abc").get() == 1);
+ assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1"));
+ assertTrue(requestState.getRequestsToCountMap().get("requestedHost1").get() == 1);
- assertNotNull(requestState.getRequestsToCountMap().get("def"));
- assertTrue(requestState.getRequestsToCountMap().get("def").get() == 1);
+ assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2"));
+ assertTrue(requestState.getRequestsToCountMap().get("requestedHost2").get() == 1);
- // Set up our final asserts before starting the allocator thread
- MockContainerListener listener = new MockContainerListener(2, 0, 0, new Runnable() {
+ final Container container0 = TestUtil
+ .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "availableHost1", 123);
+ final Container container1 = TestUtil
+ .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "availableHost2", 123);
+
+ Runnable addedContainerAssertions = new Runnable() {
@Override
public void run() {
- assertNull(requestState.getContainersOnAHost("xyz"));
- assertNull(requestState.getContainersOnAHost("zzz"));
+ assertNotNull(requestState.getRequestsToCountMap());
+ assertNull(requestState.getContainersOnAHost("availableHost1"));
+ assertNull(requestState.getContainersOnAHost("availableHost2"));
assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
}
- }, null, null);
- requestState.registerContainerListener(listener);
-
- allocatorThread.start();
-
- Container container0 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "xyz", 123);
- Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "zzz", 123);
- containerAllocator.addContainer(container0);
- containerAllocator.addContainer(container1);
+ };
- listener.verify();
+ Runnable assignedContainerAssertions = new Runnable() {
+ @Override
+ public void run() {
+ List<Container> anyHostContainers = requestState.getContainersOnAHost(ANY_HOST);
+ assertTrue(anyHostContainers == null || anyHostContainers.isEmpty());
- Thread.sleep(1000);
+ assertNotNull(requestState.getRequestsQueue());
+ assertTrue(requestState.getRequestsQueue().size() == 0);
- MockContainerUtil mockContainerUtil = (MockContainerUtil) containerUtil;
+ assertNotNull(requestState.getRequestsToCountMap());
+ assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1"));
+ assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2"));
+ }
+ };
- assertNotNull(mockContainerUtil.runningContainerList.get("xyz"));
- assertTrue(mockContainerUtil.runningContainerList.get("xyz").contains(container0));
+ Runnable runningContainerAssertions = new Runnable() {
+ @Override
+ public void run() {
+ MockContainerUtil mockContainerUtil = (MockContainerUtil) containerUtil;
- assertNotNull(mockContainerUtil.runningContainerList.get("zzz"));
- assertTrue(mockContainerUtil.runningContainerList.get("zzz").contains(container1));
+ assertNotNull(mockContainerUtil.runningContainerList.get("availableHost1"));
+ assertTrue(mockContainerUtil.runningContainerList.get("availableHost1").contains(container0));
- assertNull(requestState.getContainersOnAHost(ANY_HOST));
+ assertNotNull(mockContainerUtil.runningContainerList.get("availableHost2"));
+ assertTrue(mockContainerUtil.runningContainerList.get("availableHost2").contains(container1));
+ }
+ };
+ // Set up our final asserts before starting the allocator thread
+ MockContainerListener listener = new MockContainerListener(
+ 2, 0, 2, 2,
+ addedContainerAssertions,
+ null,
+ assignedContainerAssertions,
+ runningContainerAssertions);
+ requestState.registerContainerListener(listener);
+ ((MockContainerUtil) containerUtil).registerContainerListener(listener);
- assertNotNull(requestState.getRequestsQueue());
- assertTrue(requestState.getRequestsQueue().size() == 0);
+ containerAllocator.addContainer(container0);
+ containerAllocator.addContainer(container1);
- assertNotNull(requestState.getRequestsToCountMap());
- assertNull(requestState.getRequestsToCountMap().get("abc"));
+ // Start after adding containers to avoid a race condition between the allocator thread
+ // using the containers and the assertions after the containers are added.
+ allocatorThread.start();
- assertNull(requestState.getRequestsToCountMap().get("def"));
+ listener.verify();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
index cb82ccc..43bda8f 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
@@ -19,82 +19,130 @@
package org.apache.samza.job.yarn.util;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.samza.job.yarn.SamzaContainerRequest;
-import scala.tools.nsc.Global;
import static org.junit.Assert.assertTrue;
public class MockContainerListener {
- private static final int NUM_CONDITIONS = 3;
- private boolean allContainersAdded = false;
- private boolean allContainersReleased = false;
- private final int numExpectedContainersAdded;
- private final int numExpectedContainersReleased;
- private final int numExpectedContainersAssigned;
- private final Runnable addContainerAssertions;
- private final Runnable releaseContainerAssertions;
- private final Runnable assignContainerAssertions;
+ private final CountDownLatch conditionLatch;
+
+
+ private final AsyncCountableCondition containersAdded;
+ private final AsyncCountableCondition containersReleased;
+ private final AsyncCountableCondition containersAssigned;
+ private final AsyncCountableCondition containersRunning;
+
+ private final AsyncCountableCondition[] allConditions;
public MockContainerListener(int numExpectedContainersAdded,
int numExpectedContainersReleased,
int numExpectedContainersAssigned,
+ int numExpectedContainersRunning,
Runnable addContainerAssertions,
Runnable releaseContainerAssertions,
- Runnable assignContainerAssertions) {
- this.numExpectedContainersAdded = numExpectedContainersAdded;
- this.numExpectedContainersReleased = numExpectedContainersReleased;
- this.numExpectedContainersAssigned = numExpectedContainersAssigned;
- this.addContainerAssertions = addContainerAssertions;
- this.releaseContainerAssertions = releaseContainerAssertions;
- this.assignContainerAssertions = assignContainerAssertions;
- }
+ Runnable assignContainerAssertions,
+ Runnable runContainerAssertions) {
+ containersAdded = new AsyncCountableCondition("containers added", numExpectedContainersAdded, addContainerAssertions);
+ containersReleased = new AsyncCountableCondition("containers released", numExpectedContainersReleased, releaseContainerAssertions);
+ containersAssigned = new AsyncCountableCondition("containers assigned", numExpectedContainersAssigned, assignContainerAssertions);
+ containersRunning = new AsyncCountableCondition("containers running", numExpectedContainersRunning, runContainerAssertions);
- public synchronized void postAddContainer(Container container, int totalAddedContainers) {
- if (totalAddedContainers == numExpectedContainersAdded) {
- if (addContainerAssertions != null) {
- addContainerAssertions.run();
+ allConditions = new AsyncCountableCondition[] {containersAdded, containersReleased, containersAssigned, containersRunning};
+
+ int unsatisfiedConditions = 0;
+ for (AsyncCountableCondition condition : allConditions) {
+ if (!condition.isSatisfied()) {
+ unsatisfiedConditions++;
}
+ }
+
+ conditionLatch = new CountDownLatch(unsatisfiedConditions);
+ }
- allContainersAdded = true;
- this.notifyAll();
+ public void postAddContainer(int totalAddedContainers) {
+ if (containersAdded.update(totalAddedContainers)) {
+ conditionLatch.countDown();
}
}
- public synchronized void postReleaseContainers(int totalReleasedContainers) {
- if (totalReleasedContainers == numExpectedContainersReleased) {
- if (releaseContainerAssertions != null) {
- releaseContainerAssertions.run();
- }
+ public void postReleaseContainers(int totalReleasedContainers) {
+ if (containersReleased.update(totalReleasedContainers)) {
+ conditionLatch.countDown();
+ }
+ }
- allContainersReleased = true;
- this.notifyAll();
+ public void postUpdateRequestStateAfterAssignment(int totalAssignedContainers) {
+ if (containersAssigned.update(totalAssignedContainers)) {
+ conditionLatch.countDown();
}
}
- public synchronized void verify() {
- // There could be 1 notifyAll() for each condition, so we must wait up to that many times
- for (int i = 0; i < NUM_CONDITIONS && !(allContainersAdded && allContainersReleased); i++) {
- try {
- this.wait(5000);
- } catch (InterruptedException e) {
- // Do nothing
- }
+ public void postRunContainer(int totalRunningContainers) {
+ if (containersRunning.update(totalRunningContainers)) {
+ conditionLatch.countDown();
}
+ }
- assertTrue("Not all containers were added.", allContainersAdded);
- assertTrue("Not all containers were released.", allContainersReleased);
+ /**
+ * This method should be called in the main thread. It waits for all the conditions to occur in the other
+ * threads and then verifies that they were in fact satisfied.
+ */
+ public void verify()
+ throws InterruptedException {
+ conditionLatch.await(5, TimeUnit.SECONDS);
+
+ for (AsyncCountableCondition condition : allConditions) {
+ condition.verify();
+ }
}
- public void postUpdateRequestStateAfterAssignment(int totalAssignedContainers) {
- if (totalAssignedContainers == numExpectedContainersAssigned) {
- if (assignContainerAssertions != null) {
- assignContainerAssertions.run();
+ private static class AsyncCountableCondition {
+ private boolean satisfied = false;
+ private final int expectedCount;
+ private final Runnable postConditionAssertions;
+ private final String name;
+ private AssertionError assertionError = null;
+
+ private AsyncCountableCondition(String name, int expectedCount, Runnable postConditionAssertions) {
+ this.name = name;
+ this.expectedCount = expectedCount;
+ if (expectedCount == 0) satisfied = true;
+ this.postConditionAssertions = postConditionAssertions;
+ }
+
+ public boolean update(int latestCount) {
+ if (!satisfied && latestCount == expectedCount) {
+ if (postConditionAssertions != null) {
+ try {
+ postConditionAssertions.run();
+ } catch (Throwable t) {
+ assertionError = new AssertionError(String.format("Assertion for '%s' failed", name), t);
+ }
+ }
+
+ satisfied = true;
+ return true;
}
+ return false;
+ }
+
+ public boolean isSatisfied() {
+ return satisfied;
+ }
+
+ public void verify() {
+ assertTrue(String.format("Condition '%s' was not satisfied", name), isSatisfied());
+
+ if (assertionError != null) {
+ throw assertionError;
+ }
+ }
- this.notifyAll();
+ @Override
+ public String toString() {
+ return name;
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
index 879a7d0..7c0b504 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
@@ -30,7 +30,7 @@ import org.apache.samza.job.yarn.SamzaContainerRequest;
public class MockContainerRequestState extends ContainerRequestState {
- private final List<MockContainerListener> _mockContainerListeners = new ArrayList<MockContainerListener>();
+ private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>();
private int numAddedContainers = 0;
private int numReleasedContainers = 0;
private int numAssignedContainers = 0;
@@ -48,7 +48,7 @@ public class MockContainerRequestState extends ContainerRequestState {
numAssignedContainers++;
assignedRequests.add(request);
- for (MockContainerListener listener : _mockContainerListeners) {
+ for (MockContainerListener listener : mockContainerListeners) {
listener.postUpdateRequestStateAfterAssignment(numAssignedContainers);
}
}
@@ -58,8 +58,8 @@ public class MockContainerRequestState extends ContainerRequestState {
super.addContainer(container);
numAddedContainers++;
- for (MockContainerListener listener : _mockContainerListeners) {
- listener.postAddContainer(container, numAddedContainers);
+ for (MockContainerListener listener : mockContainerListeners) {
+ listener.postAddContainer(numAddedContainers);
}
}
@@ -67,7 +67,7 @@ public class MockContainerRequestState extends ContainerRequestState {
public synchronized int releaseExtraContainers() {
numReleasedContainers += super.releaseExtraContainers();
- for (MockContainerListener listener : _mockContainerListeners) {
+ for (MockContainerListener listener : mockContainerListeners) {
listener.postReleaseContainers(numReleasedContainers);
}
@@ -79,17 +79,12 @@ public class MockContainerRequestState extends ContainerRequestState {
super.releaseUnstartableContainer(container);
numReleasedContainers += 1;
- for (MockContainerListener listener : _mockContainerListeners) {
+ for (MockContainerListener listener : mockContainerListeners) {
listener.postReleaseContainers(numReleasedContainers);
}
}
public void registerContainerListener(MockContainerListener listener) {
- _mockContainerListeners.add(listener);
+ mockContainerListeners.add(listener);
}
-
- public void clearContainerListeners() {
- _mockContainerListeners.clear();
- }
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
index 2f9669f..cf3e143 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
@@ -33,6 +33,7 @@ import org.apache.samza.job.yarn.SamzaContainerLaunchException;
public class MockContainerUtil extends ContainerUtil {
+ private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>();
public final Map<String, List<Container>> runningContainerList = new HashMap<>();
public Exception containerStartException = null;
@@ -54,6 +55,10 @@ public class MockContainerUtil extends ContainerUtil {
runningContainerList.put(hostname, list);
}
super.runContainer(samzaContainerId, container);
+
+ for (MockContainerListener listener : mockContainerListeners) {
+ listener.postRunContainer(runningContainerList.size());
+ }
}
@Override
@@ -64,4 +69,11 @@ public class MockContainerUtil extends ContainerUtil {
}
}
+ public void registerContainerListener(MockContainerListener listener) {
+ mockContainerListeners.add(listener);
+ }
+
+ public void clearContainerListeners() {
+ mockContainerListeners.clear();
+ }
}