You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/08 14:53:13 UTC

[2/2] flink git commit: [FLINK-7076] [tests] Harden YarnResourcemanagerTest#testStopWorker to properly wait for concurrent operations

[FLINK-7076] [tests] Harden YarnResourcemanagerTest#testStopWorker to properly wait for concurrent operations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd532516
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd532516
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd532516

Branch: refs/heads/master
Commit: cd532516189b801bb02650665cbb109f8d8f8887
Parents: 59e3b01
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 8 13:29:00 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 8 15:51:17 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/yarn/YarnResourceManager.java  |  4 +-
 .../flink/yarn/YarnResourceManagerTest.java     | 77 +++++++++++---------
 2 files changed, 44 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd532516/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 7fa0e30..c900c83 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -245,7 +245,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	public boolean stopWorker(YarnWorkerNode workerNode) {
 		if (workerNode != null) {
 			Container container = workerNode.getContainer();
-			log.info("Stopping container {}.", container.getId().toString());
+			log.info("Stopping container {}.", container.getId());
 			// release the container on the node manager
 			try {
 				nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
@@ -255,7 +255,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			resourceManagerClient.releaseAssignedContainer(container.getId());
 			workerNodeMap.remove(workerNode.getResourceID());
 		} else {
-			log.error("Can not find container with resource ID {}.", workerNode.getResourceID().toString());
+			log.error("Can not find container with resource ID {}.", workerNode.getResourceID());
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd532516/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 808bc2b..18a358f 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -41,15 +42,14 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
@@ -77,6 +77,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -86,8 +87,8 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -156,8 +157,8 @@ public class YarnResourceManagerTest extends TestLogger {
 			this.mockResourceManagerClient = mockResourceManagerClient;
 		}
 
-		public void runInMainThread(Runnable runnable) {
-			super.getMainThreadExecutor().execute(runnable);
+		public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
+			return callAsync(callable, timeout);
 		}
 
 		public MainThreadExecutor getMainThreadExecutorForTesting() {
@@ -198,8 +199,6 @@ public class YarnResourceManagerTest extends TestLogger {
 				ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1);
 		public String taskHost = "host1";
 
-		SlotReport slotReport = new SlotReport();
-
 		public NMClient mockNMClient = mock(NMClient.class);
 		public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient =
 				mock(AMRMClientAsync.class);
@@ -323,16 +322,15 @@ public class YarnResourceManagerTest extends TestLogger {
 		new Context() {{
 			startResourceManager();
 			// Request slot from SlotManager.
-			resourceManager.runInMainThread(() -> {
-				try {
-					rmServices.slotManager.registerSlotRequest(
-							new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
-				} catch (SlotManagerException e) {
-					log.error("registerSlotRequest: {}", e);
-					fail("registerSlotRequest should not throw exception.");
-				}
+			CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+				rmServices.slotManager.registerSlotRequest(
+					new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+				return null;
 			});
 
+			// wait for the registerSlotRequest completion
+			registerSlotRequestFuture.get();
+
 			// Callback from YARN when container is allocated.
 			Container testingContainer = new TestContainer(taskHost, 1234, 1);
 			testingContainer.setResource(Resource.newInstance(200, 1));
@@ -344,33 +342,42 @@ public class YarnResourceManagerTest extends TestLogger {
 			// Remote task executor registers with YarnResourceManager.
 			TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
 			rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
-			final ResourceManagerGateway rmGateway =
-					resourceManager.getSelfGateway(ResourceManagerGateway.class);
-			rmGateway.registerTaskExecutor(taskHost,
-					new ResourceID(testingContainer.getId().toString()),
+
+			final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+			final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
+			final SlotReport slotReport = new SlotReport(
+				new SlotStatus(
+					new SlotID(taskManagerResourceId, 1),
+					new ResourceProfile(10, 1, 1, 1)));
+
+			CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
+				.registerTaskExecutor(
+					taskHost,
+					taskManagerResourceId,
 					slotReport,
 					dataPort,
 					hardwareDescription,
-					Time.seconds(10)).handleAsync(
-							(RegistrationResponse response, Throwable throwable) -> {
-								assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 1);
-								return null;
-							}, resourceManager.getMainThreadExecutorForTesting());
+					Time.seconds(10L))
+				.handleAsync(
+					(RegistrationResponse response, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
+					resourceManager.getMainThreadExecutorForTesting());
+
+			final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
+
+			assertEquals(1, numberRegisteredSlots);
 
 			// Unregister all task executors and release all containers.
-			resourceManager.runInMainThread(() -> {
+			CompletableFuture<?> unregisterAndReleaseFuture =  resourceManager.runInMainThread(() -> {
 				rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
-				try {
-					verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
-				} catch (Exception e) {
-					fail("stopContainer() should not throw exception.");
-				}
-				verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+				return null;
 			});
-			final CompletableFuture<Void> barrier = new CompletableFuture<>();
-			// Wait for all above operations to complete.
-			resourceManager.runInMainThread(() -> barrier.complete(null));
-			barrier.get();
+
+			unregisterAndReleaseFuture.get();
+
+			verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+			verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+
 			stopResourceManager();
 
 			// It's now safe to access the SlotManager state since the ResourceManager has been stopped.