You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/08 14:53:13 UTC
[2/2] flink git commit: [FLINK-7076] [tests] Harden
YarnResourcemanagerTest#testStopWorker to properly wait for concurrent
operations
[FLINK-7076] [tests] Harden YarnResourcemanagerTest#testStopWorker to properly wait for concurrent operations
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd532516
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd532516
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd532516
Branch: refs/heads/master
Commit: cd532516189b801bb02650665cbb109f8d8f8887
Parents: 59e3b01
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 8 13:29:00 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 8 15:51:17 2017 +0100
----------------------------------------------------------------------
.../apache/flink/yarn/YarnResourceManager.java | 4 +-
.../flink/yarn/YarnResourceManagerTest.java | 77 +++++++++++---------
2 files changed, 44 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cd532516/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 7fa0e30..c900c83 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -245,7 +245,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
public boolean stopWorker(YarnWorkerNode workerNode) {
if (workerNode != null) {
Container container = workerNode.getContainer();
- log.info("Stopping container {}.", container.getId().toString());
+ log.info("Stopping container {}.", container.getId());
// release the container on the node manager
try {
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
@@ -255,7 +255,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
resourceManagerClient.releaseAssignedContainer(container.getId());
workerNodeMap.remove(workerNode.getResourceID());
} else {
- log.error("Can not find container with resource ID {}.", workerNode.getResourceID().toString());
+ log.error("Can not find container with resource ID {}.", workerNode.getResourceID());
}
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd532516/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 808bc2b..18a358f 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -41,15 +42,14 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
@@ -77,6 +77,7 @@ import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -86,8 +87,8 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -156,8 +157,8 @@ public class YarnResourceManagerTest extends TestLogger {
this.mockResourceManagerClient = mockResourceManagerClient;
}
- public void runInMainThread(Runnable runnable) {
- super.getMainThreadExecutor().execute(runnable);
+ public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
+ return callAsync(callable, timeout);
}
public MainThreadExecutor getMainThreadExecutorForTesting() {
@@ -198,8 +199,6 @@ public class YarnResourceManagerTest extends TestLogger {
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1);
public String taskHost = "host1";
- SlotReport slotReport = new SlotReport();
-
public NMClient mockNMClient = mock(NMClient.class);
public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient =
mock(AMRMClientAsync.class);
@@ -323,16 +322,15 @@ public class YarnResourceManagerTest extends TestLogger {
new Context() {{
startResourceManager();
// Request slot from SlotManager.
- resourceManager.runInMainThread(() -> {
- try {
- rmServices.slotManager.registerSlotRequest(
- new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
- } catch (SlotManagerException e) {
- log.error("registerSlotRequest: {}", e);
- fail("registerSlotRequest should not throw exception.");
- }
+ CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+ rmServices.slotManager.registerSlotRequest(
+ new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+ return null;
});
+ // wait for the registerSlotRequest completion
+ registerSlotRequestFuture.get();
+
// Callback from YARN when container is allocated.
Container testingContainer = new TestContainer(taskHost, 1234, 1);
testingContainer.setResource(Resource.newInstance(200, 1));
@@ -344,33 +342,42 @@ public class YarnResourceManagerTest extends TestLogger {
// Remote task executor registers with YarnResourceManager.
TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
- final ResourceManagerGateway rmGateway =
- resourceManager.getSelfGateway(ResourceManagerGateway.class);
- rmGateway.registerTaskExecutor(taskHost,
- new ResourceID(testingContainer.getId().toString()),
+
+ final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+ final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
+ final SlotReport slotReport = new SlotReport(
+ new SlotStatus(
+ new SlotID(taskManagerResourceId, 1),
+ new ResourceProfile(10, 1, 1, 1)));
+
+ CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
+ .registerTaskExecutor(
+ taskHost,
+ taskManagerResourceId,
slotReport,
dataPort,
hardwareDescription,
- Time.seconds(10)).handleAsync(
- (RegistrationResponse response, Throwable throwable) -> {
- assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 1);
- return null;
- }, resourceManager.getMainThreadExecutorForTesting());
+ Time.seconds(10L))
+ .handleAsync(
+ (RegistrationResponse response, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
+ resourceManager.getMainThreadExecutorForTesting());
+
+ final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
+
+ assertEquals(1, numberRegisteredSlots);
// Unregister all task executors and release all containers.
- resourceManager.runInMainThread(() -> {
+ CompletableFuture<?> unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> {
rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
- try {
- verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
- } catch (Exception e) {
- fail("stopContainer() should not throw exception.");
- }
- verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+ return null;
});
- final CompletableFuture<Void> barrier = new CompletableFuture<>();
- // Wait for all above operations to complete.
- resourceManager.runInMainThread(() -> barrier.complete(null));
- barrier.get();
+
+ unregisterAndReleaseFuture.get();
+
+ verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+ verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+
stopResourceManager();
// It's now safe to access the SlotManager state since the ResourceManager has been stopped.