You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/08/09 17:50:55 UTC
[09/23] samza git commit: SAMZA-1359;
Handle phantom container notifications cleanly during an RM fail-over
SAMZA-1359; Handle phantom container notifications cleanly during an RM fail-over
1. Improved our container handling logic to be resilient to phantom notifications.
2. Added a new metric to Samza's ContainerProcessManager module that tracks the number of such invalid notifications.
3. Add a couple of tests that simulate this exact scenario above that we encountered during the cluster upgrade. (container starts -> container fails -> legitimate notification for the failure - container re-start -> RM fail-over -> phantom notification with a different exit code)
4. As an aside, there are a whole bunch of tests in ContainerProcessManager that rely on Thread.sleep to ensure that threads get to run in a certain order. Removed this non-determinism and made them predictable.
Author: Jagadish Venkatraman <jv...@jvenkatr-mn2.linkedin.biz>
Reviewers: Jake Maes <jm...@linkedin.com>
Closes #243 from vjagadish1989/am-bug
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/35143b67
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/35143b67
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/35143b67
Branch: refs/heads/0.14.0
Commit: 35143b676f23f69324b442fd1061318a663538f0
Parents: 91b22fd
Author: Jagadish Venkatraman <jv...@jvenkatr-mn2.linkedin.biz>
Authored: Tue Jul 25 11:18:14 2017 -0700
Committer: Jagadish <ja...@apache.org>
Committed: Tue Jul 25 11:18:14 2017 -0700
----------------------------------------------------------------------
.../clustermanager/ContainerProcessManager.java | 145 +++++++------
.../clustermanager/SamzaApplicationState.java | 8 +
.../ContainerProcessManagerMetrics.scala | 1 +
.../clustermanager/MockContainerAllocator.java | 24 +++
.../TestContainerProcessManager.java | 207 ++++++++++++++++---
5 files changed, 283 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 9b5e871..2861e9e 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -239,9 +239,10 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
}
if (containerId == null) {
log.info("No matching container id found for " + containerStatus.toString());
- } else {
- state.runningContainers.remove(containerId);
+ state.redundantNotifications.incrementAndGet();
+ return;
}
+ state.runningContainers.remove(containerId);
int exitStatus = containerStatus.getExitCode();
switch (exitStatus) {
@@ -250,10 +251,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.completedContainers.incrementAndGet();
- if (containerId != null) {
- state.finishedContainers.incrementAndGet();
- containerFailures.remove(containerId);
- }
+ state.finishedContainers.incrementAndGet();
+ containerFailures.remove(containerId);
if (state.completedContainers.get() == state.containerCount.get()) {
log.info("Setting job status to SUCCEEDED, since all containers have been marked as completed.");
@@ -273,18 +272,16 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.releasedContainers.incrementAndGet();
// If this container was assigned some partitions (a containerId), then
- // clean up, and request a refactor container for the tasks. This only
+ // clean up, and request a new container for the tasks. This only
// should happen if the container was 'lost' due to node failure, not
// if the AM released the container.
- if (containerId != null) {
- log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId);
+ log.info("Released container {} was assigned task group ID {}. Requesting a new container for the task group.", containerIdStr, containerId);
- state.neededContainers.incrementAndGet();
- state.jobHealthy.set(false);
+ state.neededContainers.incrementAndGet();
+ state.jobHealthy.set(false);
- // request a container on refactor host
- containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
- }
+ // request a container on new host
+ containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
break;
default:
@@ -296,72 +293,70 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.failedContainersStatus.put(containerIdStr, containerStatus);
state.jobHealthy.set(false);
- if (containerId != null) {
- state.neededContainers.incrementAndGet();
- // Find out previously running container location
- String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
- if (!hostAffinityEnabled || lastSeenOn == null) {
- lastSeenOn = ResourceRequestState.ANY_HOST;
+ state.neededContainers.incrementAndGet();
+ // Find out previously running container location
+ String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
+ if (!hostAffinityEnabled || lastSeenOn == null) {
+ lastSeenOn = ResourceRequestState.ANY_HOST;
+ }
+ log.info("Container was last seen on " + lastSeenOn);
+ // A container failed for an unknown reason. Let's check to see if
+ // we need to shutdown the whole app master if too many container
+ // failures have happened. The rules for failing are that the
+ // failure count for a task group id must be > the configured retry
+ // count, and the last failure (the one prior to this one) must have
+ // happened less than retry window ms ago. If retry count is set to
+ // 0, the app master will fail on any container failure. If the
+ // retry count is set to a number < 0, a container failure will
+ // never trigger an app master failure.
+ int retryCount = clusterManagerConfig.getContainerRetryCount();
+ int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
+
+ if (retryCount == 0) {
+ log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr);
+
+ tooManyFailedContainers = true;
+ } else if (retryCount > 0) {
+ int currentFailCount;
+ long lastFailureTime;
+ if (containerFailures.containsKey(containerId)) {
+ ResourceFailure failure = containerFailures.get(containerId);
+ currentFailCount = failure.getCount() + 1;
+ lastFailureTime = failure.getLastFailure();
+ } else {
+ currentFailCount = 1;
+ lastFailureTime = 0L;
}
- log.info("Container was last seen on " + lastSeenOn);
- // A container failed for an unknown reason. Let's check to see if
- // we need to shutdown the whole app master if too many container
- // failures have happened. The rules for failing are that the
- // failure count for a task group id must be > the configured retry
- // count, and the last failure (the one prior to this one) must have
- // happened less than retry window ms ago. If retry count is set to
- // 0, the app master will fail on any container failure. If the
- // retry count is set to a number < 0, a container failure will
- // never trigger an app master failure.
- int retryCount = clusterManagerConfig.getContainerRetryCount();
- int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
-
- if (retryCount == 0) {
- log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr);
-
- tooManyFailedContainers = true;
- } else if (retryCount > 0) {
- int currentFailCount;
- long lastFailureTime;
- if (containerFailures.containsKey(containerId)) {
- ResourceFailure failure = containerFailures.get(containerId);
- currentFailCount = failure.getCount() + 1;
- lastFailureTime = failure.getLastFailure();
+ if (currentFailCount >= retryCount) {
+ long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime;
+
+ if (lastFailureMsDiff < retryWindowMs) {
+ log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount +
+ " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " +
+ retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed.");
+
+ // We have too many failures, and we're within the window
+ // boundary, so reset shut down the app master.
+ tooManyFailedContainers = true;
+ state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
} else {
- currentFailCount = 1;
- lastFailureTime = 0L;
- }
- if (currentFailCount >= retryCount) {
- long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime;
-
- if (lastFailureMsDiff < retryWindowMs) {
- log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount +
- " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " +
- retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed.");
-
- // We have too many failures, and we're within the window
- // boundary, so reset shut down the app master.
- tooManyFailedContainers = true;
- state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
- } else {
- log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " +
- "this container ID was outside the bounds of the retry window.", containerId, containerIdStr);
-
- // Reset counter back to 1, since the last failure for this
- // container happened outside the window boundary.
- containerFailures.put(containerId, new ResourceFailure(1, System.currentTimeMillis()));
- }
- } else {
- log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount);
- containerFailures.put(containerId, new ResourceFailure(currentFailCount, System.currentTimeMillis()));
+ log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " +
+ "this container ID was outside the bounds of the retry window.", containerId, containerIdStr);
+
+ // Reset counter back to 1, since the last failure for this
+ // container happened outside the window boundary.
+ containerFailures.put(containerId, new ResourceFailure(1, System.currentTimeMillis()));
}
+ } else {
+ log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount);
+ containerFailures.put(containerId, new ResourceFailure(currentFailCount, System.currentTimeMillis()));
}
+ }
- if (!tooManyFailedContainers) {
- log.info("Requesting a refactor container ");
- // Request a refactor container
- containerAllocator.requestResource(containerId, lastSeenOn);
- }
+ if (!tooManyFailedContainers) {
+ log.info("Requesting a new container ");
+ // Request a new container
+ containerAllocator.requestResource(containerId, lastSeenOn);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index bde3fac..653fb4e 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -116,6 +116,14 @@ public class SamzaApplicationState {
public final AtomicInteger matchedResourceRequests = new AtomicInteger(0);
+ /**
+ * Number of invalid container notifications.
+ *
+ * A notification is "invalid" if the corresponding container is not currently managed by the
+ * {@link ContainerProcessManager}
+ */
+ public final AtomicInteger redundantNotifications = new AtomicInteger(0);
+
public SamzaApplicationState(JobModelManager jobModelManager) {
this.jobModelManager = jobModelManager;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 6c3081b..c396ed6 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -52,6 +52,7 @@ class ContainerProcessManagerMetrics(
val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get())
val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())
val mContainers = newGauge("container-count", () => state.containerCount)
+ val mRedundantNotifications = newGauge("redundant-notifications", () => state.redundantNotifications.get())
val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0)
val mLocalityMatchedRequests = newGauge(
http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
index 109ed47..449b484 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
@@ -23,9 +23,13 @@ import org.apache.samza.config.Config;
import java.lang.reflect.Field;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
public class MockContainerAllocator extends ContainerAllocator {
public int requestedContainers = 0;
+ private Semaphore semaphore = new Semaphore(0);
public MockContainerAllocator(ClusterResourceManager manager,
Config config,
@@ -33,6 +37,20 @@ public class MockContainerAllocator extends ContainerAllocator {
super(manager, config, state);
}
+ /**
+ * Causes the current thread to block until the expected number of containers have started.
+ *
+ * @param numExpectedContainers the number of containers expected to start
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the {@code timeout} argument
+ *
+ * @return a boolean that specifies whether containers started within the timeout.
+ * @throws InterruptedException if the current thread is interrupted while waiting
+ */
+ boolean awaitContainersStart(int numExpectedContainers, long timeout, TimeUnit unit) throws InterruptedException {
+ return semaphore.tryAcquire(numExpectedContainers, timeout, unit);
+ }
+
@Override
public void requestResources(Map<String, String> containerToHostMappings) {
requestedContainers += containerToHostMappings.size();
@@ -45,4 +63,10 @@ public class MockContainerAllocator extends ContainerAllocator {
return (ResourceRequestState) field.get(this);
}
+
+ @Override
+ protected void runStreamProcessor(SamzaResourceRequest request, String preferredHost) {
+ super.runStreamProcessor(request, preferredHost);
+ semaphore.release();
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 8199559..6978341 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -32,16 +32,17 @@ import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -173,15 +174,19 @@ public class TestContainerProcessManager {
state);
getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
-
+ CountDownLatch latch = new CountDownLatch(1);
getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, new Thread() {
public void run() {
isRunning = true;
+ latch.countDown();
}
});
taskManager.start();
- Thread.sleep(1000);
+
+ if (!latch.await(2, TimeUnit.SECONDS)) {
+ Assert.fail("timed out waiting for the latch to expire");
+ }
// Verify Allocator thread has started running
assertTrue(isRunning);
@@ -206,40 +211,56 @@ public class TestContainerProcessManager {
);
taskManager.start();
- Thread.sleep(100);
-
Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
assertTrue(allocatorThread.isAlive());
taskManager.stop();
- Thread.sleep(100);
assertFalse(allocatorThread.isAlive());
-
}
/**
* Test Task Manager should stop when all containers finish
*/
@Test
- public void testTaskManagerShouldStopWhenContainersFinish() {
+ public void testTaskManagerShouldStopWhenContainersFinish() throws Exception {
Config conf = getConfig();
state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
- ContainerProcessManager taskManager = new ContainerProcessManager(
- new MapConfig(conf),
- state,
- new MetricsRegistryMap(),
- manager
+ ContainerProcessManager taskManager = new ContainerProcessManager(
+ new MapConfig(conf),
+ state,
+ new MetricsRegistryMap(),
+ manager
);
+ MockContainerAllocator allocator = new MockContainerAllocator(
+ manager,
+ conf,
+ state);
+
+ getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
+
+ Thread thread = new Thread(allocator);
+ getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
+
+ // start triggers a request
taskManager.start();
assertFalse(taskManager.shouldShutdown());
+ assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
- taskManager.onResourceCompleted(new SamzaResourceStatus("123", "diagnostics", SamzaResourceStatus.SUCCESS));
+ SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
+ taskManager.onResourceAllocated(container);
+ // Allow container to run and update state
+ if (!allocator.awaitContainersStart(1,2, TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
+ assertFalse(taskManager.shouldShutdown());
+
+ taskManager.onResourceCompleted(new SamzaResourceStatus("id0", "diagnostics", SamzaResourceStatus.SUCCESS));
assertTrue(taskManager.shouldShutdown());
}
@@ -281,7 +302,9 @@ public class TestContainerProcessManager {
taskManager.onResourceAllocated(container);
// Allow container to run and update state
- Thread.sleep(300);
+ if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
// Create first container failure
taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "diagnostics", 1));
@@ -299,7 +322,9 @@ public class TestContainerProcessManager {
taskManager.onResourceAllocated(container);
// Allow container to run and update state
- Thread.sleep(1000);
+ if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
assertTrue(state.jobHealthy.get());
@@ -318,6 +343,117 @@ public class TestContainerProcessManager {
taskManager.stop();
}
+ @Test
+ public void testInvalidNotificationsAreIgnored() throws Exception {
+ Config conf = getConfig();
+
+ Map<String, String> config = new HashMap<>();
+ config.putAll(getConfig());
+ state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+
+ ContainerProcessManager taskManager = new ContainerProcessManager(
+ new MapConfig(conf),
+ state,
+ new MetricsRegistryMap(),
+ manager
+ );
+
+ MockContainerAllocator allocator = new MockContainerAllocator(
+ manager,
+ conf,
+ state);
+ getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
+
+ Thread thread = new Thread(allocator);
+ getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
+
+ // Start the task manager
+ taskManager.start();
+
+ SamzaResource container = new SamzaResource(1, 1000, "abc", "id1");
+ taskManager.onResourceAllocated(container);
+
+ // Allow container to run and update state
+ if (!allocator.awaitContainersStart(1,2, TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
+
+ // Create container failure - with ContainerExitStatus.DISKS_FAILED
+ taskManager.onResourceCompleted(new SamzaResourceStatus("invalidContainerID", "Disk failure", SamzaResourceStatus.DISK_FAIL));
+
+ // The above failure should not trigger any container requests, since it is for an invalid container ID
+ assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
+ assertFalse(taskManager.shouldShutdown());
+ assertTrue(state.jobHealthy.get());
+ assertEquals(state.redundantNotifications.get(), 1);
+ }
+
+ @Test
+ public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception {
+ Config conf = getConfig();
+
+ Map<String, String> config = new HashMap<>();
+ config.putAll(getConfig());
+ state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+
+ ContainerProcessManager taskManager = new ContainerProcessManager(
+ new MapConfig(conf),
+ state,
+ new MetricsRegistryMap(),
+ manager
+ );
+
+ MockContainerAllocator allocator = new MockContainerAllocator(
+ manager,
+ conf,
+ state);
+ getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
+
+ Thread thread = new Thread(allocator);
+ getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
+
+ // Start the task manager
+ taskManager.start();
+ assertFalse(taskManager.shouldShutdown());
+ assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+
+ SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1");
+ taskManager.onResourceAllocated(container1);
+
+ // Allow container to run and update state
+ if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
+ assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
+
+ // Create container failure - with ContainerExitStatus.DISKS_FAILED
+ taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL));
+
+ // The above failure should trigger a container request
+ assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+ assertFalse(taskManager.shouldShutdown());
+ assertFalse(state.jobHealthy.get());
+ assertEquals(2, manager.resourceRequests.size());
+ assertEquals(0, manager.releasedResources.size());
+ assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+
+ SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
+ taskManager.onResourceAllocated(container2);
+
+ // Allow container to run and update state
+ if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
+ assertTrue(state.jobHealthy.get());
+
+ // Simulate a duplicate notification for container 1 with a different exit code
+ taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.PREEMPTED));
+ // assert that a duplicate notification does not change metrics (including job health)
+ assertEquals(state.redundantNotifications.get(), 1);
+ assertEquals(2, manager.resourceRequests.size());
+ assertEquals(0, manager.releasedResources.size());
+ assertTrue(state.jobHealthy.get());
+ }
/**
* Test AM requests a new container when a task fails
@@ -329,8 +465,6 @@ public class TestContainerProcessManager {
Map<String, String> config = new HashMap<>();
config.putAll(getConfig());
- config.remove("yarn.container.retry.count");
-
state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
ContainerProcessManager taskManager = new ContainerProcessManager(
@@ -354,14 +488,17 @@ public class TestContainerProcessManager {
assertFalse(taskManager.shouldShutdown());
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
- SamzaResource container = new SamzaResource(1, 1000, "abc", "id1");
- taskManager.onResourceAllocated(container);
+ SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1");
+ taskManager.onResourceAllocated(container1);
// Allow container to run and update state
- Thread.sleep(300);
+ if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
+ assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
// Create container failure - with ContainerExitStatus.DISKS_FAILED
- taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL));
+ taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL));
// The above failure should trigger a container request
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
@@ -371,21 +508,37 @@ public class TestContainerProcessManager {
assertEquals(0, manager.releasedResources.size());
assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+ SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
+ taskManager.onResourceAllocated(container2);
+
+ // Allow container to run and update state
+ if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
+
// Create container failure - with ContainerExitStatus.PREEMPTED
- taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Preemption", SamzaResourceStatus.PREEMPTED));
+ taskManager.onResourceCompleted(new SamzaResourceStatus(container2.getResourceID(), "Preemption", SamzaResourceStatus.PREEMPTED));
+ assertEquals(3, manager.resourceRequests.size());
// The above failure should trigger a container request
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
assertFalse(taskManager.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+ SamzaResource container3 = new SamzaResource(1, 1000, "abc", "id3");
+ taskManager.onResourceAllocated(container3);
+
+ // Allow container to run and update state
+ if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
// Create container failure - with ContainerExitStatus.ABORTED
- taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Aborted", SamzaResourceStatus.ABORTED));
+ taskManager.onResourceCompleted(new SamzaResourceStatus(container3.getResourceID(), "Aborted", SamzaResourceStatus.ABORTED));
// The above failure should trigger a container request
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
- assertEquals(2, manager.resourceRequests.size());
+ assertEquals(4, manager.resourceRequests.size());
assertEquals(0, manager.releasedResources.size());
assertFalse(taskManager.shouldShutdown());
assertFalse(state.jobHealthy.get());