You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/03/19 17:51:08 UTC

[samza] branch master updated: SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting

This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a0d081  SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting
0a0d081 is described below

commit 0a0d0814a5f324503631ea41fe4987c856f8fab3
Author: Jagadish <jv...@linkedin.com>
AuthorDate: Tue Mar 19 10:50:58 2019 -0700

    SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting
    
    Steps involved when starting a Samza container:
    1. Issue a request to YARN to launch a container
    2. Record that container as "pending" launch.
    3. Launch callback succeeds on a different thread : The callback looks at the "pending" container and marks it as "running".
    
    A race-condition in the above: If the main thread gets pre-empted between (1) and (2), the callback thread wouldn't see the container state as "pending" - hence, it wouldn't transition it to a "running" state.
    
    This PR fixes it by flipping (1) and (2) - ie., record the intent prior to issuing the launch request.
    Added an unit test - refactored existing tests
    
    Author: Jagadish <jv...@linkedin.com>
    
    Reviewers: Prateek M<pm...@linkedin.com>
    
    Closes #958 from vjagadish1989/samza-2117
---
 .../clustermanager/AbstractContainerAllocator.java |  9 ++--
 .../clustermanager/MockClusterResourceManager.java |  9 +++-
 .../MockClusterResourceManagerFactory.java         |  2 +-
 .../clustermanager/TestContainerAllocator.java     |  4 +-
 .../TestContainerProcessManager.java               | 51 +++++++++++++++-------
 .../clustermanager/TestContainerRequestState.java  |  2 +-
 .../TestHostAwareContainerAllocator.java           | 11 ++---
 7 files changed, 61 insertions(+), 27 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 7adb1cc..adc09fe 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -152,10 +152,13 @@ public abstract class AbstractContainerAllocator implements Runnable {
             + "timestamp {} to resource {}",
         new Object[]{preferredHost, String.valueOf(containerID), request.getRequestTimestampMs(), resource.getResourceID()});
 
-    //Submit a request to launch a StreamProcessor on the provided resource. To match with the response returned later
-    //in the callback, we should also store state about the container whose launch is pending.
-    clusterResourceManager.launchStreamProcessor(resource, builder);
+    // Update container state as "pending" and then issue a request to launch it. It's important to perform the state-update
+    // prior to issuing the request. Otherwise, there's a race where the response callback may arrive sooner and not see
+    // the container as "pending" (SAMZA-2117)
+
     state.pendingContainers.put(containerID, resource);
+
+    clusterResourceManager.launchStreamProcessor(resource, builder);
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index 4545a75..7d9f13a 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.samza.clustermanager;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.samza.job.CommandBuilder;
+import org.junit.Assert;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -40,11 +41,13 @@ public class MockClusterResourceManager extends ClusterResourceManager {
 
   private final Semaphore requestCountSemaphore = new Semaphore(0);
   private final Semaphore launchCountSemaphore = new Semaphore(0);
+  private final SamzaApplicationState state;
 
   Throwable nextException = null;
 
-  MockClusterResourceManager(ClusterResourceManager.Callback callback) {
+  MockClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) {
     super(callback);
+    this.state = state;
   }
 
   @Override
@@ -81,12 +84,16 @@ public class MockClusterResourceManager extends ClusterResourceManager {
 
   @Override
   public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder)  {
+    // assert that the resource is in "pending" state prior to invoking this method
+    Assert.assertTrue(state.pendingContainers.values().contains(resource));
+
     if (nextException != null) {
       clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(nextException));
     } else {
       launchedResources.add(resource);
       clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
     }
+
     for (MockContainerListener listener : mockContainerListeners) {
       listener.postRunContainer(launchedResources.size());
     }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
index 3a464c2..0701e35 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
@@ -27,6 +27,6 @@ public class MockClusterResourceManagerFactory implements ResourceManagerFactory
   @Override
   public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback,
       SamzaApplicationState state) {
-    return new MockClusterResourceManager(callback);
+    return new MockClusterResourceManager(callback, state);
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index dd40e7c..a20c37f 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -41,11 +41,13 @@ import static org.junit.Assert.assertTrue;
 
 public class TestContainerAllocator {
   private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
-  private final MockClusterResourceManager manager = new MockClusterResourceManager(callback);
   private final Config config = getConfig();
   private final JobModelManager jobModelManager = JobModelManagerTestUtil.getJobModelManager(config, 1,
       new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
+
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
+  private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, state);
+
   private ContainerAllocator containerAllocator;
   private MockContainerRequestState requestState;
   private Thread allocatorThread;
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 841e5ba..324d17d 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -48,8 +48,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestContainerProcessManager {
-  private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
-  private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback);
 
   private static volatile boolean isRunning = false;
 
@@ -85,7 +83,6 @@ public class TestContainerProcessManager {
 
   private HttpServer server = null;
 
-  private SamzaApplicationState state = null;
 
   private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) {
     Map<String, Map<String, String>> localityMap = new HashMap<>();
@@ -122,7 +119,10 @@ public class TestContainerProcessManager {
     conf.put("cluster-manager.container.memory.mb", "500");
     conf.put("cluster-manager.container.cpu.cores", "5");
 
-    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+
     ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
         state,
@@ -143,6 +143,8 @@ public class TestContainerProcessManager {
     conf.put("cluster-manager.container.cpu.cores", "5");
 
     state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1")));
+    callback = new MockClusterResourceManagerCallback();
+    clusterResourceManager = new MockClusterResourceManager(callback, state);
     taskManager = new ContainerProcessManager(
         new MapConfig(conf),
         state,
@@ -161,8 +163,9 @@ public class TestContainerProcessManager {
   @Test
   public void testOnInit() throws Exception {
     Config conf = getConfig();
-    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
-
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
         state,
@@ -203,7 +206,9 @@ public class TestContainerProcessManager {
   @Test
   public void testOnShutdown() throws Exception {
     Config conf = getConfig();
-    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
@@ -227,7 +232,9 @@ public class TestContainerProcessManager {
   @Test
   public void testTaskManagerShouldStopWhenContainersFinish() throws Exception {
     Config conf = getConfig();
-    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
@@ -275,7 +282,9 @@ public class TestContainerProcessManager {
   @Test
   public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception {
     Config conf = getConfig();
-    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
@@ -352,7 +361,9 @@ public class TestContainerProcessManager {
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
@@ -393,9 +404,11 @@ public class TestContainerProcessManager {
 
   @Test
   public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1")));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1")));
     Map<String, String> configMap = new HashMap<>();
     configMap.putAll(getConfig());
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
@@ -421,8 +434,10 @@ public class TestContainerProcessManager {
     config.put("job.container.count", "2");
     Config cfg = new MapConfig(config);
     // 1. Request two containers on hosts - host1 and host2
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1",
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1",
         "1", "host2")));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
         cfg,
@@ -487,7 +502,9 @@ public class TestContainerProcessManager {
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
@@ -561,7 +578,9 @@ public class TestContainerProcessManager {
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
@@ -648,7 +667,9 @@ public class TestContainerProcessManager {
   @Test
   public void testAppMasterWithFwk() {
     Config conf = getConfig();
-    state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
index 3d52510..86dea85 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
@@ -29,7 +29,7 @@ import static org.junit.Assert.assertTrue;
 public class TestContainerRequestState {
 
   private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
-  private final MockClusterResourceManager manager = new MockClusterResourceManager(callback);
+  private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, new SamzaApplicationState(null));
 
   private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
 
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index fd3b452..c99be9b 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -49,10 +49,12 @@ import static org.mockito.Mockito.when;
 
 public class TestHostAwareContainerAllocator {
 
-  private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
-  private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback);
   private final Config config = getConfig();
-  private final JobModelManager reader = initializeJobModelManager(config, 1);
+  private final JobModelManager jobModelManager = initializeJobModelManager(config, 1);
+  private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+  private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
+
+  private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
 
   private JobModelManager initializeJobModelManager(Config config, int containerCount) {
     Map<String, Map<String, String>> localityMap = new HashMap<>();
@@ -66,7 +68,6 @@ public class TestHostAwareContainerAllocator {
         new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
   }
 
-  private final SamzaApplicationState state = new SamzaApplicationState(reader);
   private HostAwareContainerAllocator containerAllocator;
   private final int timeoutMillis = 1000;
   private MockContainerRequestState requestState;
@@ -414,7 +415,7 @@ public class TestHostAwareContainerAllocator {
 
   @After
   public void teardown() throws Exception {
-    reader.stop();
+    jobModelManager.stop();
     containerAllocator.stop();
   }