You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/03/19 17:51:08 UTC
[samza] branch master updated: SAMZA-2117: Handle race condition in
container launch due to incorrect AM accounting
This is an automated email from the ASF dual-hosted git repository.
jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0a0d081 SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting
0a0d081 is described below
commit 0a0d0814a5f324503631ea41fe4987c856f8fab3
Author: Jagadish <jv...@linkedin.com>
AuthorDate: Tue Mar 19 10:50:58 2019 -0700
SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting
Steps involved when starting a Samza container:
1. Issue a request to YARN to launch a container
2. Record that container as "pending" launch.
3. Launch callback succeeds on a different thread : The callback looks at the "pending" container and marks it as "running".
A race-condition in the above: If the main thread gets pre-empted between (1) and (2), the callback thread wouldn't see the container state as "pending" - hence, it wouldn't transition it to a "running" state.
This PR fixes it by flipping (1) and (2) - ie., record the intent prior to issuing the launch request.
Added an unit test - refactored existing tests
Author: Jagadish <jv...@linkedin.com>
Reviewers: Prateek M<pm...@linkedin.com>
Closes #958 from vjagadish1989/samza-2117
---
.../clustermanager/AbstractContainerAllocator.java | 9 ++--
.../clustermanager/MockClusterResourceManager.java | 9 +++-
.../MockClusterResourceManagerFactory.java | 2 +-
.../clustermanager/TestContainerAllocator.java | 4 +-
.../TestContainerProcessManager.java | 51 +++++++++++++++-------
.../clustermanager/TestContainerRequestState.java | 2 +-
.../TestHostAwareContainerAllocator.java | 11 ++---
7 files changed, 61 insertions(+), 27 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 7adb1cc..adc09fe 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -152,10 +152,13 @@ public abstract class AbstractContainerAllocator implements Runnable {
+ "timestamp {} to resource {}",
new Object[]{preferredHost, String.valueOf(containerID), request.getRequestTimestampMs(), resource.getResourceID()});
- //Submit a request to launch a StreamProcessor on the provided resource. To match with the response returned later
- //in the callback, we should also store state about the container whose launch is pending.
- clusterResourceManager.launchStreamProcessor(resource, builder);
+ // Update container state as "pending" and then issue a request to launch it. It's important to perform the state-update
+ // prior to issuing the request. Otherwise, there's a race where the response callback may arrive sooner and not see
+ // the container as "pending" (SAMZA-2117)
+
state.pendingContainers.put(containerID, resource);
+
+ clusterResourceManager.launchStreamProcessor(resource, builder);
}
/**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index 4545a75..7d9f13a 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.samza.clustermanager;
import com.google.common.collect.ImmutableList;
import org.apache.samza.job.CommandBuilder;
+import org.junit.Assert;
import java.util.ArrayList;
import java.util.Collections;
@@ -40,11 +41,13 @@ public class MockClusterResourceManager extends ClusterResourceManager {
private final Semaphore requestCountSemaphore = new Semaphore(0);
private final Semaphore launchCountSemaphore = new Semaphore(0);
+ private final SamzaApplicationState state;
Throwable nextException = null;
- MockClusterResourceManager(ClusterResourceManager.Callback callback) {
+ MockClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) {
super(callback);
+ this.state = state;
}
@Override
@@ -81,12 +84,16 @@ public class MockClusterResourceManager extends ClusterResourceManager {
@Override
public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
+ // assert that the resource is in "pending" state prior to invoking this method
+ Assert.assertTrue(state.pendingContainers.values().contains(resource));
+
if (nextException != null) {
clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(nextException));
} else {
launchedResources.add(resource);
clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
}
+
for (MockContainerListener listener : mockContainerListeners) {
listener.postRunContainer(launchedResources.size());
}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
index 3a464c2..0701e35 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
@@ -27,6 +27,6 @@ public class MockClusterResourceManagerFactory implements ResourceManagerFactory
@Override
public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback,
SamzaApplicationState state) {
- return new MockClusterResourceManager(callback);
+ return new MockClusterResourceManager(callback, state);
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index dd40e7c..a20c37f 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -41,11 +41,13 @@ import static org.junit.Assert.assertTrue;
public class TestContainerAllocator {
private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
- private final MockClusterResourceManager manager = new MockClusterResourceManager(callback);
private final Config config = getConfig();
private final JobModelManager jobModelManager = JobModelManagerTestUtil.getJobModelManager(config, 1,
new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
+
private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
+ private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, state);
+
private ContainerAllocator containerAllocator;
private MockContainerRequestState requestState;
private Thread allocatorThread;
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 841e5ba..324d17d 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -48,8 +48,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestContainerProcessManager {
- private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
- private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback);
private static volatile boolean isRunning = false;
@@ -85,7 +83,6 @@ public class TestContainerProcessManager {
private HttpServer server = null;
- private SamzaApplicationState state = null;
private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) {
Map<String, Map<String, String>> localityMap = new HashMap<>();
@@ -122,7 +119,10 @@ public class TestContainerProcessManager {
conf.put("cluster-manager.container.memory.mb", "500");
conf.put("cluster-manager.container.cpu.cores", "5");
- state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
@@ -143,6 +143,8 @@ public class TestContainerProcessManager {
conf.put("cluster-manager.container.cpu.cores", "5");
state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1")));
+ callback = new MockClusterResourceManagerCallback();
+ clusterResourceManager = new MockClusterResourceManager(callback, state);
taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
@@ -161,8 +163,9 @@ public class TestContainerProcessManager {
@Test
public void testOnInit() throws Exception {
Config conf = getConfig();
- state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
-
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
@@ -203,7 +206,9 @@ public class TestContainerProcessManager {
@Test
public void testOnShutdown() throws Exception {
Config conf = getConfig();
- state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
@@ -227,7 +232,9 @@ public class TestContainerProcessManager {
@Test
public void testTaskManagerShouldStopWhenContainersFinish() throws Exception {
Config conf = getConfig();
- state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
@@ -275,7 +282,9 @@ public class TestContainerProcessManager {
@Test
public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception {
Config conf = getConfig();
- state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
@@ -352,7 +361,9 @@ public class TestContainerProcessManager {
Map<String, String> config = new HashMap<>();
config.putAll(getConfig());
- state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
@@ -393,9 +404,11 @@ public class TestContainerProcessManager {
@Test
public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
- state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1")));
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1")));
Map<String, String> configMap = new HashMap<>();
configMap.putAll(getConfig());
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
@@ -421,8 +434,10 @@ public class TestContainerProcessManager {
config.put("job.container.count", "2");
Config cfg = new MapConfig(config);
// 1. Request two containers on hosts - host1 and host2
- state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1",
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1",
"1", "host2")));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerProcessManager taskManager = new ContainerProcessManager(
cfg,
@@ -487,7 +502,9 @@ public class TestContainerProcessManager {
Map<String, String> config = new HashMap<>();
config.putAll(getConfig());
- state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
@@ -561,7 +578,9 @@ public class TestContainerProcessManager {
Map<String, String> config = new HashMap<>();
config.putAll(getConfig());
- state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
@@ -648,7 +667,9 @@ public class TestContainerProcessManager {
@Test
public void testAppMasterWithFwk() {
Config conf = getConfig();
- state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+ MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
index 3d52510..86dea85 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
@@ -29,7 +29,7 @@ import static org.junit.Assert.assertTrue;
public class TestContainerRequestState {
private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
- private final MockClusterResourceManager manager = new MockClusterResourceManager(callback);
+ private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, new SamzaApplicationState(null));
private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index fd3b452..c99be9b 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -49,10 +49,12 @@ import static org.mockito.Mockito.when;
public class TestHostAwareContainerAllocator {
- private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
- private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback);
private final Config config = getConfig();
- private final JobModelManager reader = initializeJobModelManager(config, 1);
+ private final JobModelManager jobModelManager = initializeJobModelManager(config, 1);
+ private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+ private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
+
+ private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
private JobModelManager initializeJobModelManager(Config config, int containerCount) {
Map<String, Map<String, String>> localityMap = new HashMap<>();
@@ -66,7 +68,6 @@ public class TestHostAwareContainerAllocator {
new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
}
- private final SamzaApplicationState state = new SamzaApplicationState(reader);
private HostAwareContainerAllocator containerAllocator;
private final int timeoutMillis = 1000;
private MockContainerRequestState requestState;
@@ -414,7 +415,7 @@ public class TestHostAwareContainerAllocator {
@After
public void teardown() throws Exception {
- reader.stop();
+ jobModelManager.stop();
containerAllocator.stop();
}