You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/08 14:53:12 UTC

[1/2] flink git commit: [FLINK-7076] [yarn] Implement stopWorker logic for YarnResourceManager

Repository: flink
Updated Branches:
  refs/heads/master 7d293692f -> cd5325161


[FLINK-7076] [yarn] Implement stopWorker logic for YarnResourceManager

This closes #4729.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59e3b014
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59e3b014
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59e3b014

Branch: refs/heads/master
Commit: 59e3b014fef2015c497396d66fb00f2408f182b6
Parents: 7d29369
Author: Shuyi Chen <sh...@uber.com>
Authored: Thu Oct 5 14:54:08 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 8 15:51:16 2017 +0100

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  |  15 +-
 .../MesosResourceManagerTest.java               |   2 +-
 .../resourcemanager/ResourceManager.java        |  21 +-
 .../registration/WorkerRegistration.java        |   5 +-
 .../slotmanager/SlotManager.java                |  17 +
 .../apache/flink/yarn/YarnResourceManager.java  |  66 +++-
 .../org/apache/flink/yarn/YarnWorkerNode.java   |  49 +++
 .../flink/yarn/YarnResourceManagerTest.java     | 381 +++++++++++++++++++
 8 files changed, 517 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 1e32b2c..cabb7d7 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -421,14 +421,13 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	}
 
 	@Override
-	public boolean stopWorker(ResourceID resourceID) {
-		LOG.info("Stopping worker {}.", resourceID);
-
+	public boolean stopWorker(RegisteredMesosWorkerNode workerNode) {
+		LOG.info("Stopping worker {}.", workerNode.getResourceID());
 		try {
 
-			if (workersInLaunch.containsKey(resourceID)) {
+			if (workersInLaunch.containsKey(workerNode.getResourceID())) {
 				// update persistent state of worker to Released
-				MesosWorkerStore.Worker worker = workersInLaunch.remove(resourceID);
+				MesosWorkerStore.Worker worker = workersInLaunch.remove(workerNode.getResourceID());
 				worker = worker.releaseWorker();
 				workerStore.putWorker(worker);
 				workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
@@ -440,11 +439,11 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 					launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), selfActor);
 				}
 			}
-			else if (workersBeingReturned.containsKey(resourceID)) {
-				LOG.info("Ignoring request to stop worker {} because it is already being stopped.", resourceID);
+			else if (workersBeingReturned.containsKey(workerNode.getResourceID())) {
+				LOG.info("Ignoring request to stop worker {} because it is already being stopped.", workerNode.getResourceID());
 			}
 			else {
-				LOG.warn("Unrecognized worker {}.", resourceID);
+				LOG.warn("Unrecognized worker {}.", workerNode.getResourceID());
 			}
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index a0a8069..85ba142 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -701,7 +701,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class);
 
 			// tell the RM to stop the worker
-			resourceManager.stopWorker(extractResourceID(task1));
+			resourceManager.stopWorker(new RegisteredMesosWorkerNode(worker1launched));
 
 			// verify that the instance state was updated
 			MesosWorkerStore.Worker worker1Released = worker1launched.releaseWorker();

http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c8d7302..3ba8808 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -62,7 +63,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -86,7 +86,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(JobMasterId, SlotRequest, Time)} requests a slot from the resource manager</li>
  * </ul>
  */
-public abstract class ResourceManager<WorkerType extends Serializable>
+public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		extends FencedRpcEndpoint<ResourceManagerId>
 		implements ResourceManagerGateway, LeaderContender {
 
@@ -800,21 +800,22 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	protected void releaseResource(InstanceID instanceId) {
-		ResourceID resourceID = null;
+		WorkerType worker = null;
 
 		// TODO: Improve performance by having an index on the instanceId
 		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry : taskExecutors.entrySet()) {
 			if (entry.getValue().getInstanceID().equals(instanceId)) {
-				resourceID = entry.getKey();
+				worker = entry.getValue().getWorker();
 				break;
 			}
 		}
 
-		if (resourceID != null) {
-			if (stopWorker(resourceID)) {
-				closeTaskManagerConnection(resourceID, new FlinkException("Worker was stopped."));
+		if (worker != null) {
+			if (stopWorker(worker)) {
+				closeTaskManagerConnection(worker.getResourceID(),
+						new FlinkException("Worker was stopped."));
 			} else {
-				log.debug("Worker {} was not stopped.", resourceID);
+				log.debug("Worker {} was not stopped.", worker.getResourceID());
 			}
 		} else {
 			// unregister in order to clean up potential left over state
@@ -958,10 +959,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/**
 	 * Stops the given worker.
 	 *
-	 * @param resourceID identifying the worker to be stopped
+	 * @param worker The worker.
 	 * @return True if the worker was stopped, otherwise false
 	 */
-	public abstract boolean stopWorker(ResourceID resourceID);
+	public abstract boolean stopWorker(WorkerType worker);
 
 	// ------------------------------------------------------------------------
 	//  Static utility classes

http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index 47697b6..eaa3c03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -18,16 +18,15 @@
 
 package org.apache.flink.runtime.resourcemanager.registration;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
 
-import java.io.Serializable;
-
 /**
  * This class extends the {@link TaskExecutorConnection}, adding the worker information.
  */
-public class WorkerRegistration<WorkerType extends Serializable> extends TaskExecutorConnection {
+public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> extends TaskExecutorConnection {
 
 	private final WorkerType worker;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index ba52f02..7326719 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -959,4 +959,21 @@ public class SlotManager implements AutoCloseable {
 			return false;
 		}
 	}
+
+	@VisibleForTesting
+	public void unregisterTaskManagersAndReleaseResources() {
+		Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> taskManagerRegistrationIterator =
+				taskManagerRegistrations.entrySet().iterator();
+
+		while (taskManagerRegistrationIterator.hasNext()) {
+			TaskManagerRegistration taskManagerRegistration =
+					taskManagerRegistrationIterator.next().getValue();
+
+			taskManagerRegistrationIterator.remove();
+
+			internalUnregisterTaskManager(taskManagerRegistration);
+
+			resourceActions.releaseResource(taskManagerRegistration.getInstanceId());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index b32d25c..7fa0e30 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -63,11 +65,14 @@ import scala.concurrent.duration.FiniteDuration;
  * The yarn implementation of the resource manager. Used when the system is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager<YarnWorkerNode> implements AMRMClientAsync.CallbackHandler {
 
 	/** The process environment variables. */
 	private final Map<String, String> env;
 
+	/** YARN container map. Package private for unit test purposes. */
+	final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
+
 	/** The default registration timeout for task executor in seconds. */
 	private static final int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300;
 
@@ -133,6 +138,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 		this.flinkConfig  = flinkConfig;
 		this.yarnConfig = new YarnConfiguration();
 		this.env = env;
+		this.workerNodeMap = new ConcurrentHashMap<>();
 		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
 				YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
 
@@ -149,25 +155,34 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 		numPendingContainerRequests = 0;
 	}
 
-	@Override
-	protected void initialize() throws ResourceManagerException {
-		resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
-		resourceManagerClient.init(yarnConfig);
-		resourceManagerClient.start();
+	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() {
+		AMRMClientAsync<AMRMClient.ContainerRequest> rmc = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
+		rmc.init(yarnConfig);
+		rmc.start();
 		try {
 			//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
 			Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
 			//TODO: the third paramter should be the webmonitor address
-			resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
+			rmc.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
 		} catch (Exception e) {
 			log.info("registerApplicationMaster fail", e);
 		}
+		return rmc;
+	}
 
+	protected NMClient createAndStartNodeManagerClient() {
 		// create the client to communicate with the node managers
-		nodeManagerClient = NMClient.createNMClient();
-		nodeManagerClient.init(yarnConfig);
-		nodeManagerClient.start();
-		nodeManagerClient.cleanupRunningContainersOnStop(true);
+		NMClient nmc = NMClient.createNMClient();
+		nmc.init(yarnConfig);
+		nmc.start();
+		nmc.cleanupRunningContainersOnStop(true);
+		return nmc;
+	}
+
+	@Override
+	protected void initialize() throws ResourceManagerException {
+		resourceManagerClient = createAndStartResourceManagerClient();
+		nodeManagerClient = createAndStartNodeManagerClient();
 	}
 
 	@Override
@@ -227,14 +242,27 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	}
 
 	@Override
-	public boolean stopWorker(ResourceID resourceID) {
-		// TODO: Implement to stop the worker
-		return false;
+	public boolean stopWorker(YarnWorkerNode workerNode) {
+		if (workerNode != null) {
+			Container container = workerNode.getContainer();
+			log.info("Stopping container {}.", container.getId().toString());
+			// release the container on the node manager
+			try {
+				nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
+			} catch (Throwable t) {
+				log.warn("Error while calling YARN Node Manager to stop container", t);
+			}
+			resourceManagerClient.releaseAssignedContainer(container.getId());
+			workerNodeMap.remove(workerNode.getResourceID());
+		} else {
+			log.error("Can not find container with resource ID {}.", workerNode.getResourceID().toString());
+		}
+		return true;
 	}
 
 	@Override
-	protected ResourceID workerStarted(ResourceID resourceID) {
-		return resourceID;
+	protected YarnWorkerNode workerStarted(ResourceID resourceID) {
+		return workerNodeMap.get(resourceID);
 	}
 
 	// AMRMClientAsync CallbackHandler methods
@@ -260,10 +288,14 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
 			log.info("Received new container: {} - Remaining pending container requests: {}",
 					container.getId(), numPendingContainerRequests);
+			final String containerIdStr = container.getId().toString();
+			workerNodeMap.put(new ResourceID(containerIdStr),
+					new YarnWorkerNode(container));
 			try {
 				/** Context information used to start a TaskExecutor Java process */
 				ContainerLaunchContext taskExecutorLaunchContext =
-						createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost());
+						createTaskExecutorLaunchContext(
+								container.getResource(), containerIdStr, container.getNodeId().getHost());
 				nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java
new file mode 100644
index 0000000..8e02066
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable {
+
+	private final ResourceID resourceID;
+	private final Container container;
+
+	public YarnWorkerNode(Container container) {
+		Preconditions.checkNotNull(container);
+		this.resourceID = new ResourceID(container.getId().toString());
+		this.container = container;
+	}
+
+	@Override
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
+	public Container getContainer() {
+		return container;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
new file mode 100644
index 0000000..808bc2b
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * General tests for the YARN resource manager component.
+ */
+public class YarnResourceManagerTest extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(YarnResourceManagerTest.class);
+
+	private static Configuration flinkConfig = new Configuration();
+
+	private static Map<String, String> env = new HashMap<>();
+
+	private static final Time timeout = Time.seconds(10L);
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
+	@Before
+	public void setup() {
+		flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
+		File root = folder.getRoot();
+		File home = new File(root, "home");
+		boolean created = home.mkdir();
+		assertTrue(created);
+
+		env.put(ENV_APP_ID, "foo");
+		env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
+		env.put(ENV_CLIENT_SHIP_FILES, "");
+		env.put(ENV_FLINK_CLASSPATH, "");
+		env.put(ENV_HADOOP_USER_NAME, "foo");
+		env.put(FLINK_JAR_PATH, root.toURI().toString());
+	}
+
+	@After
+	public void teardown() {
+		env.clear();
+	}
+
+	static class TestingYarnResourceManager extends YarnResourceManager {
+		public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
+		public NMClient mockNMClient;
+
+		public TestingYarnResourceManager(
+				RpcService rpcService,
+				String resourceManagerEndpointId,
+				ResourceID resourceId,
+				Configuration flinkConfig,
+				Map<String, String> env,
+				ResourceManagerConfiguration resourceManagerConfiguration,
+				HighAvailabilityServices highAvailabilityServices,
+				HeartbeatServices heartbeatServices,
+				SlotManager slotManager,
+				MetricRegistry metricRegistry,
+				JobLeaderIdService jobLeaderIdService,
+				FatalErrorHandler fatalErrorHandler,
+				AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient,
+				NMClient mockNMClient) {
+			super(rpcService, resourceManagerEndpointId, resourceId, flinkConfig, env,
+					resourceManagerConfiguration, highAvailabilityServices, heartbeatServices,
+					slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler);
+			this.mockNMClient = mockNMClient;
+			this.mockResourceManagerClient = mockResourceManagerClient;
+		}
+
+		public void runInMainThread(Runnable runnable) {
+			super.getMainThreadExecutor().execute(runnable);
+		}
+
+		public MainThreadExecutor getMainThreadExecutorForTesting() {
+			return super.getMainThreadExecutor();
+		}
+
+		@Override
+		protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() {
+			return mockResourceManagerClient;
+		}
+
+		@Override
+		protected NMClient createAndStartNodeManagerClient() {
+			return mockNMClient;
+		}
+	}
+
+	static class Context {
+
+		// services
+		final TestingRpcService rpcService;
+		final TestingFatalErrorHandler fatalErrorHandler;
+		final MockResourceManagerRuntimeServices rmServices;
+
+		// RM
+		final ResourceManagerConfiguration rmConfiguration;
+		final ResourceID rmResourceID;
+		static final String RM_ADDRESS = "resourceManager";
+		final TestingYarnResourceManager resourceManager;
+
+		final int dataPort = 1234;
+		final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
+
+		// domain objects for test purposes
+		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200);
+
+		public ContainerId task = ContainerId.newInstance(
+				ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1);
+		public String taskHost = "host1";
+
+		SlotReport slotReport = new SlotReport();
+
+		public NMClient mockNMClient = mock(NMClient.class);
+		public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient =
+				mock(AMRMClientAsync.class);
+
+		/**
+		 * Create mock RM dependencies.
+		 */
+		Context() throws Exception {
+			rpcService = new TestingRpcService();
+			fatalErrorHandler = new TestingFatalErrorHandler();
+			rmServices = new MockResourceManagerRuntimeServices();
+
+			// resource manager
+			rmConfiguration = new ResourceManagerConfiguration(
+					Time.seconds(5L),
+					Time.seconds(5L));
+			rmResourceID = ResourceID.generate();
+			resourceManager =
+					new TestingYarnResourceManager(
+							rpcService,
+							RM_ADDRESS,
+							rmResourceID,
+							flinkConfig,
+							env,
+							rmConfiguration,
+							rmServices.highAvailabilityServices,
+							rmServices.heartbeatServices,
+							rmServices.slotManager,
+							rmServices.metricRegistry,
+							rmServices.jobLeaderIdService,
+							fatalErrorHandler,
+							mockResourceManagerClient,
+							mockNMClient
+					);
+		}
+
+		/**
+		 * Mock services needed by the resource manager.
+		 */
+		class MockResourceManagerRuntimeServices {
+
+			public final ScheduledExecutor scheduledExecutor;
+			public final TestingHighAvailabilityServices highAvailabilityServices;
+			public final HeartbeatServices heartbeatServices;
+			public final MetricRegistry metricRegistry;
+			public final TestingLeaderElectionService rmLeaderElectionService;
+			public final JobLeaderIdService jobLeaderIdService;
+			public final SlotManager slotManager;
+
+			public UUID rmLeaderSessionId;
+
+			MockResourceManagerRuntimeServices() throws Exception {
+				scheduledExecutor = mock(ScheduledExecutor.class);
+				highAvailabilityServices = new TestingHighAvailabilityServices();
+				rmLeaderElectionService = new TestingLeaderElectionService();
+				highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+				heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
+				metricRegistry = new NoOpMetricRegistry();
+				slotManager = new SlotManager(
+						new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()),
+						Time.seconds(10), Time.seconds(10), Time.minutes(1));
+				jobLeaderIdService = new JobLeaderIdService(
+						highAvailabilityServices,
+						rpcService.getScheduledExecutor(),
+						Time.minutes(5L));
+			}
+
+			public void grantLeadership() throws Exception {
+				rmLeaderSessionId = UUID.randomUUID();
+				rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			}
+		}
+
+		/**
+		 * Start the resource manager and grant leadership to it.
+		 */
+		public void startResourceManager() throws Exception {
+			resourceManager.start();
+			rmServices.grantLeadership();
+		}
+
+		/**
+		 * Stop the Akka actor system.
+		 */
+		public void stopResourceManager() throws Exception {
+			rpcService.stopService();
+		}
+	}
+
+	static class TestContainer extends UtilsTest.TestingContainer {
+		Resource resource;
+		Priority priority;
+
+		TestContainer(String host, int port, int containerId) {
+			super(host, port, containerId);
+		}
+
+		@Override
+		public Resource getResource() {
+			return resource;
+		}
+
+		@Override
+		public void setResource(Resource resource) {
+			this.resource = resource;
+		}
+
+		@Override
+		public Priority getPriority() {
+			return priority;
+		}
+
+		@Override
+		public void setPriority(Priority priority) {
+			this.priority = priority;
+		}
+	}
+
+	@Test
+	public void testStopWorker() throws Exception {
+		new Context() {{
+			startResourceManager();
+			// Request slot from SlotManager.
+			resourceManager.runInMainThread(() -> {
+				try {
+					rmServices.slotManager.registerSlotRequest(
+							new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+				} catch (SlotManagerException e) {
+					log.error("registerSlotRequest: {}", e);
+					fail("registerSlotRequest should not throw exception.");
+				}
+			});
+
+			// Callback from YARN when container is allocated.
+			Container testingContainer = new TestContainer(taskHost, 1234, 1);
+			testingContainer.setResource(Resource.newInstance(200, 1));
+			testingContainer.setPriority(Priority.UNDEFINED);
+			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
+
+			// Remote task executor registers with YarnResourceManager.
+			TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
+			rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
+			final ResourceManagerGateway rmGateway =
+					resourceManager.getSelfGateway(ResourceManagerGateway.class);
+			rmGateway.registerTaskExecutor(taskHost,
+					new ResourceID(testingContainer.getId().toString()),
+					slotReport,
+					dataPort,
+					hardwareDescription,
+					Time.seconds(10)).handleAsync(
+							(RegistrationResponse response, Throwable throwable) -> {
+								assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 1);
+								return null;
+							}, resourceManager.getMainThreadExecutorForTesting());
+
+			// Unregister all task executors and release all containers.
+			resourceManager.runInMainThread(() -> {
+				rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
+				try {
+					verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+				} catch (Exception e) {
+					fail("stopContainer() should not throw exception.");
+				}
+				verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+			});
+			final CompletableFuture<Void> barrier = new CompletableFuture<>();
+			// Wait for all above operations to complete.
+			resourceManager.runInMainThread(() -> barrier.complete(null));
+			barrier.get();
+			stopResourceManager();
+
+			// It's now safe to access the SlotManager state since the ResourceManager has been stopped.
+			assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0);
+			assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
+		}};
+	}
+}


[2/2] flink git commit: [FLINK-7076] [tests] Harden YarnResourcemanagerTest#testStopWorker to properly wait for concurrent operations

Posted by tr...@apache.org.
[FLINK-7076] [tests] Harden YarnResourcemanagerTest#testStopWorker to properly wait for concurrent operations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd532516
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd532516
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd532516

Branch: refs/heads/master
Commit: cd532516189b801bb02650665cbb109f8d8f8887
Parents: 59e3b01
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 8 13:29:00 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 8 15:51:17 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/yarn/YarnResourceManager.java  |  4 +-
 .../flink/yarn/YarnResourceManagerTest.java     | 77 +++++++++++---------
 2 files changed, 44 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd532516/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 7fa0e30..c900c83 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -245,7 +245,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	public boolean stopWorker(YarnWorkerNode workerNode) {
 		if (workerNode != null) {
 			Container container = workerNode.getContainer();
-			log.info("Stopping container {}.", container.getId().toString());
+			log.info("Stopping container {}.", container.getId());
 			// release the container on the node manager
 			try {
 				nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
@@ -255,7 +255,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			resourceManagerClient.releaseAssignedContainer(container.getId());
 			workerNodeMap.remove(workerNode.getResourceID());
 		} else {
-			log.error("Can not find container with resource ID {}.", workerNode.getResourceID().toString());
+			log.error("Can not find container with resource ID {}.", workerNode.getResourceID());
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd532516/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 808bc2b..18a358f 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -41,15 +42,14 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
@@ -77,6 +77,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -86,8 +87,8 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -156,8 +157,8 @@ public class YarnResourceManagerTest extends TestLogger {
 			this.mockResourceManagerClient = mockResourceManagerClient;
 		}
 
-		public void runInMainThread(Runnable runnable) {
-			super.getMainThreadExecutor().execute(runnable);
+		public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
+			return callAsync(callable, timeout);
 		}
 
 		public MainThreadExecutor getMainThreadExecutorForTesting() {
@@ -198,8 +199,6 @@ public class YarnResourceManagerTest extends TestLogger {
 				ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1);
 		public String taskHost = "host1";
 
-		SlotReport slotReport = new SlotReport();
-
 		public NMClient mockNMClient = mock(NMClient.class);
 		public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient =
 				mock(AMRMClientAsync.class);
@@ -323,16 +322,15 @@ public class YarnResourceManagerTest extends TestLogger {
 		new Context() {{
 			startResourceManager();
 			// Request slot from SlotManager.
-			resourceManager.runInMainThread(() -> {
-				try {
-					rmServices.slotManager.registerSlotRequest(
-							new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
-				} catch (SlotManagerException e) {
-					log.error("registerSlotRequest: {}", e);
-					fail("registerSlotRequest should not throw exception.");
-				}
+			CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+				rmServices.slotManager.registerSlotRequest(
+					new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+				return null;
 			});
 
+			// wait for the registerSlotRequest completion
+			registerSlotRequestFuture.get();
+
 			// Callback from YARN when container is allocated.
 			Container testingContainer = new TestContainer(taskHost, 1234, 1);
 			testingContainer.setResource(Resource.newInstance(200, 1));
@@ -344,33 +342,42 @@ public class YarnResourceManagerTest extends TestLogger {
 			// Remote task executor registers with YarnResourceManager.
 			TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
 			rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
-			final ResourceManagerGateway rmGateway =
-					resourceManager.getSelfGateway(ResourceManagerGateway.class);
-			rmGateway.registerTaskExecutor(taskHost,
-					new ResourceID(testingContainer.getId().toString()),
+
+			final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+			final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
+			final SlotReport slotReport = new SlotReport(
+				new SlotStatus(
+					new SlotID(taskManagerResourceId, 1),
+					new ResourceProfile(10, 1, 1, 1)));
+
+			CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
+				.registerTaskExecutor(
+					taskHost,
+					taskManagerResourceId,
 					slotReport,
 					dataPort,
 					hardwareDescription,
-					Time.seconds(10)).handleAsync(
-							(RegistrationResponse response, Throwable throwable) -> {
-								assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 1);
-								return null;
-							}, resourceManager.getMainThreadExecutorForTesting());
+					Time.seconds(10L))
+				.handleAsync(
+					(RegistrationResponse response, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
+					resourceManager.getMainThreadExecutorForTesting());
+
+			final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
+
+			assertEquals(1, numberRegisteredSlots);
 
 			// Unregister all task executors and release all containers.
-			resourceManager.runInMainThread(() -> {
+			CompletableFuture<?> unregisterAndReleaseFuture =  resourceManager.runInMainThread(() -> {
 				rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
-				try {
-					verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
-				} catch (Exception e) {
-					fail("stopContainer() should not throw exception.");
-				}
-				verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+				return null;
 			});
-			final CompletableFuture<Void> barrier = new CompletableFuture<>();
-			// Wait for all above operations to complete.
-			resourceManager.runInMainThread(() -> barrier.complete(null));
-			barrier.get();
+
+			unregisterAndReleaseFuture.get();
+
+			verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+			verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+
 			stopResourceManager();
 
 			// It's now safe to access the SlotManager state since the ResourceManager has been stopped.