You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/08 14:53:12 UTC
[1/2] flink git commit: [FLINK-7076] [yarn] Implement stopWorker
logic for YarnResourceManager
Repository: flink
Updated Branches:
refs/heads/master 7d293692f -> cd5325161
[FLINK-7076] [yarn] Implement stopWorker logic for YarnResourceManager
This closes #4729.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59e3b014
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59e3b014
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59e3b014
Branch: refs/heads/master
Commit: 59e3b014fef2015c497396d66fb00f2408f182b6
Parents: 7d29369
Author: Shuyi Chen <sh...@uber.com>
Authored: Thu Oct 5 14:54:08 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 8 15:51:16 2017 +0100
----------------------------------------------------------------------
.../clusterframework/MesosResourceManager.java | 15 +-
.../MesosResourceManagerTest.java | 2 +-
.../resourcemanager/ResourceManager.java | 21 +-
.../registration/WorkerRegistration.java | 5 +-
.../slotmanager/SlotManager.java | 17 +
.../apache/flink/yarn/YarnResourceManager.java | 66 +++-
.../org/apache/flink/yarn/YarnWorkerNode.java | 49 +++
.../flink/yarn/YarnResourceManagerTest.java | 381 +++++++++++++++++++
8 files changed, 517 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 1e32b2c..cabb7d7 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -421,14 +421,13 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
}
@Override
- public boolean stopWorker(ResourceID resourceID) {
- LOG.info("Stopping worker {}.", resourceID);
-
+ public boolean stopWorker(RegisteredMesosWorkerNode workerNode) {
+ LOG.info("Stopping worker {}.", workerNode.getResourceID());
try {
- if (workersInLaunch.containsKey(resourceID)) {
+ if (workersInLaunch.containsKey(workerNode.getResourceID())) {
// update persistent state of worker to Released
- MesosWorkerStore.Worker worker = workersInLaunch.remove(resourceID);
+ MesosWorkerStore.Worker worker = workersInLaunch.remove(workerNode.getResourceID());
worker = worker.releaseWorker();
workerStore.putWorker(worker);
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
@@ -440,11 +439,11 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), selfActor);
}
}
- else if (workersBeingReturned.containsKey(resourceID)) {
- LOG.info("Ignoring request to stop worker {} because it is already being stopped.", resourceID);
+ else if (workersBeingReturned.containsKey(workerNode.getResourceID())) {
+ LOG.info("Ignoring request to stop worker {} because it is already being stopped.", workerNode.getResourceID());
}
else {
- LOG.warn("Unrecognized worker {}.", resourceID);
+ LOG.warn("Unrecognized worker {}.", workerNode.getResourceID());
}
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index a0a8069..85ba142 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -701,7 +701,7 @@ public class MesosResourceManagerTest extends TestLogger {
resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class);
// tell the RM to stop the worker
- resourceManager.stopWorker(extractResourceID(task1));
+ resourceManager.stopWorker(new RegisteredMesosWorkerNode(worker1launched));
// verify that the instance state was updated
MesosWorkerStore.Worker worker1Released = worker1launched.releaseWorker();
http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c8d7302..3ba8808 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -62,7 +63,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -86,7 +86,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <li>{@link #requestSlot(JobMasterId, SlotRequest, Time)} requests a slot from the resource manager</li>
* </ul>
*/
-public abstract class ResourceManager<WorkerType extends Serializable>
+public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
extends FencedRpcEndpoint<ResourceManagerId>
implements ResourceManagerGateway, LeaderContender {
@@ -800,21 +800,22 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
protected void releaseResource(InstanceID instanceId) {
- ResourceID resourceID = null;
+ WorkerType worker = null;
// TODO: Improve performance by having an index on the instanceId
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry : taskExecutors.entrySet()) {
if (entry.getValue().getInstanceID().equals(instanceId)) {
- resourceID = entry.getKey();
+ worker = entry.getValue().getWorker();
break;
}
}
- if (resourceID != null) {
- if (stopWorker(resourceID)) {
- closeTaskManagerConnection(resourceID, new FlinkException("Worker was stopped."));
+ if (worker != null) {
+ if (stopWorker(worker)) {
+ closeTaskManagerConnection(worker.getResourceID(),
+ new FlinkException("Worker was stopped."));
} else {
- log.debug("Worker {} was not stopped.", resourceID);
+ log.debug("Worker {} was not stopped.", worker.getResourceID());
}
} else {
// unregister in order to clean up potential left over state
@@ -958,10 +959,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
/**
* Stops the given worker.
*
- * @param resourceID identifying the worker to be stopped
+ * @param worker The worker.
* @return True if the worker was stopped, otherwise false
*/
- public abstract boolean stopWorker(ResourceID resourceID);
+ public abstract boolean stopWorker(WorkerType worker);
// ------------------------------------------------------------------------
// Static utility classes
http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index 47697b6..eaa3c03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -18,16 +18,15 @@
package org.apache.flink.runtime.resourcemanager.registration;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.Preconditions;
-import java.io.Serializable;
-
/**
* This class extends the {@link TaskExecutorConnection}, adding the worker information.
*/
-public class WorkerRegistration<WorkerType extends Serializable> extends TaskExecutorConnection {
+public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> extends TaskExecutorConnection {
private final WorkerType worker;
http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index ba52f02..7326719 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -959,4 +959,21 @@ public class SlotManager implements AutoCloseable {
return false;
}
}
+
+ @VisibleForTesting
+ public void unregisterTaskManagersAndReleaseResources() {
+ Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> taskManagerRegistrationIterator =
+ taskManagerRegistrations.entrySet().iterator();
+
+ while (taskManagerRegistrationIterator.hasNext()) {
+ TaskManagerRegistration taskManagerRegistration =
+ taskManagerRegistrationIterator.next().getValue();
+
+ taskManagerRegistrationIterator.remove();
+
+ internalUnregisterTaskManager(taskManagerRegistration);
+
+ resourceActions.releaseResource(taskManagerRegistration.getInstanceId());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index b32d25c..7fa0e30 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.FiniteDuration;
@@ -63,11 +65,14 @@ import scala.concurrent.duration.FiniteDuration;
* The yarn implementation of the resource manager. Used when the system is started
* via the resource framework YARN.
*/
-public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager<YarnWorkerNode> implements AMRMClientAsync.CallbackHandler {
/** The process environment variables. */
private final Map<String, String> env;
+ /** YARN container map. Package private for unit test purposes. */
+ final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
+
/** The default registration timeout for task executor in seconds. */
private static final int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300;
@@ -133,6 +138,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
this.flinkConfig = flinkConfig;
this.yarnConfig = new YarnConfiguration();
this.env = env;
+ this.workerNodeMap = new ConcurrentHashMap<>();
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
@@ -149,25 +155,34 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
numPendingContainerRequests = 0;
}
- @Override
- protected void initialize() throws ResourceManagerException {
- resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
- resourceManagerClient.init(yarnConfig);
- resourceManagerClient.start();
+ protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() {
+ AMRMClientAsync<AMRMClient.ContainerRequest> rmc = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
+ rmc.init(yarnConfig);
+ rmc.start();
try {
//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
//TODO: the third paramter should be the webmonitor address
- resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
+ rmc.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
} catch (Exception e) {
log.info("registerApplicationMaster fail", e);
}
+ return rmc;
+ }
+ protected NMClient createAndStartNodeManagerClient() {
// create the client to communicate with the node managers
- nodeManagerClient = NMClient.createNMClient();
- nodeManagerClient.init(yarnConfig);
- nodeManagerClient.start();
- nodeManagerClient.cleanupRunningContainersOnStop(true);
+ NMClient nmc = NMClient.createNMClient();
+ nmc.init(yarnConfig);
+ nmc.start();
+ nmc.cleanupRunningContainersOnStop(true);
+ return nmc;
+ }
+
+ @Override
+ protected void initialize() throws ResourceManagerException {
+ resourceManagerClient = createAndStartResourceManagerClient();
+ nodeManagerClient = createAndStartNodeManagerClient();
}
@Override
@@ -227,14 +242,27 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
}
@Override
- public boolean stopWorker(ResourceID resourceID) {
- // TODO: Implement to stop the worker
- return false;
+ public boolean stopWorker(YarnWorkerNode workerNode) {
+ if (workerNode != null) {
+ Container container = workerNode.getContainer();
+ log.info("Stopping container {}.", container.getId().toString());
+ // release the container on the node manager
+ try {
+ nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
+ } catch (Throwable t) {
+ log.warn("Error while calling YARN Node Manager to stop container", t);
+ }
+ resourceManagerClient.releaseAssignedContainer(container.getId());
+ workerNodeMap.remove(workerNode.getResourceID());
+ } else {
+ log.error("Can not find container with resource ID {}.", workerNode.getResourceID().toString());
+ }
+ return true;
}
@Override
- protected ResourceID workerStarted(ResourceID resourceID) {
- return resourceID;
+ protected YarnWorkerNode workerStarted(ResourceID resourceID) {
+ return workerNodeMap.get(resourceID);
}
// AMRMClientAsync CallbackHandler methods
@@ -260,10 +288,14 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
log.info("Received new container: {} - Remaining pending container requests: {}",
container.getId(), numPendingContainerRequests);
+ final String containerIdStr = container.getId().toString();
+ workerNodeMap.put(new ResourceID(containerIdStr),
+ new YarnWorkerNode(container));
try {
/** Context information used to start a TaskExecutor Java process */
ContainerLaunchContext taskExecutorLaunchContext =
- createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost());
+ createTaskExecutorLaunchContext(
+ container.getResource(), containerIdStr, container.getNodeId().getHost());
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java
new file mode 100644
index 0000000..8e02066
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable {
+
+ private final ResourceID resourceID;
+ private final Container container;
+
+ public YarnWorkerNode(Container container) {
+ Preconditions.checkNotNull(container);
+ this.resourceID = new ResourceID(container.getId().toString());
+ this.container = container;
+ }
+
+ @Override
+ public ResourceID getResourceID() {
+ return resourceID;
+ }
+
+ public Container getContainer() {
+ return container;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59e3b014/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
new file mode 100644
index 0000000..808bc2b
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * General tests for the YARN resource manager component.
+ */
+public class YarnResourceManagerTest extends TestLogger {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YarnResourceManagerTest.class);
+
+ private static Configuration flinkConfig = new Configuration();
+
+ private static Map<String, String> env = new HashMap<>();
+
+ private static final Time timeout = Time.seconds(10L);
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Before
+ public void setup() {
+ flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
+ File root = folder.getRoot();
+ File home = new File(root, "home");
+ boolean created = home.mkdir();
+ assertTrue(created);
+
+ env.put(ENV_APP_ID, "foo");
+ env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
+ env.put(ENV_CLIENT_SHIP_FILES, "");
+ env.put(ENV_FLINK_CLASSPATH, "");
+ env.put(ENV_HADOOP_USER_NAME, "foo");
+ env.put(FLINK_JAR_PATH, root.toURI().toString());
+ }
+
+ @After
+ public void teardown() {
+ env.clear();
+ }
+
+ static class TestingYarnResourceManager extends YarnResourceManager {
+ public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
+ public NMClient mockNMClient;
+
+ public TestingYarnResourceManager(
+ RpcService rpcService,
+ String resourceManagerEndpointId,
+ ResourceID resourceId,
+ Configuration flinkConfig,
+ Map<String, String> env,
+ ResourceManagerConfiguration resourceManagerConfiguration,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ SlotManager slotManager,
+ MetricRegistry metricRegistry,
+ JobLeaderIdService jobLeaderIdService,
+ FatalErrorHandler fatalErrorHandler,
+ AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient,
+ NMClient mockNMClient) {
+ super(rpcService, resourceManagerEndpointId, resourceId, flinkConfig, env,
+ resourceManagerConfiguration, highAvailabilityServices, heartbeatServices,
+ slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler);
+ this.mockNMClient = mockNMClient;
+ this.mockResourceManagerClient = mockResourceManagerClient;
+ }
+
+ public void runInMainThread(Runnable runnable) {
+ super.getMainThreadExecutor().execute(runnable);
+ }
+
+ public MainThreadExecutor getMainThreadExecutorForTesting() {
+ return super.getMainThreadExecutor();
+ }
+
+ @Override
+ protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() {
+ return mockResourceManagerClient;
+ }
+
+ @Override
+ protected NMClient createAndStartNodeManagerClient() {
+ return mockNMClient;
+ }
+ }
+
+ static class Context {
+
+ // services
+ final TestingRpcService rpcService;
+ final TestingFatalErrorHandler fatalErrorHandler;
+ final MockResourceManagerRuntimeServices rmServices;
+
+ // RM
+ final ResourceManagerConfiguration rmConfiguration;
+ final ResourceID rmResourceID;
+ static final String RM_ADDRESS = "resourceManager";
+ final TestingYarnResourceManager resourceManager;
+
+ final int dataPort = 1234;
+ final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
+
+ // domain objects for test purposes
+ final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200);
+
+ public ContainerId task = ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1);
+ public String taskHost = "host1";
+
+ SlotReport slotReport = new SlotReport();
+
+ public NMClient mockNMClient = mock(NMClient.class);
+ public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient =
+ mock(AMRMClientAsync.class);
+
+ /**
+ * Create mock RM dependencies.
+ */
+ Context() throws Exception {
+ rpcService = new TestingRpcService();
+ fatalErrorHandler = new TestingFatalErrorHandler();
+ rmServices = new MockResourceManagerRuntimeServices();
+
+ // resource manager
+ rmConfiguration = new ResourceManagerConfiguration(
+ Time.seconds(5L),
+ Time.seconds(5L));
+ rmResourceID = ResourceID.generate();
+ resourceManager =
+ new TestingYarnResourceManager(
+ rpcService,
+ RM_ADDRESS,
+ rmResourceID,
+ flinkConfig,
+ env,
+ rmConfiguration,
+ rmServices.highAvailabilityServices,
+ rmServices.heartbeatServices,
+ rmServices.slotManager,
+ rmServices.metricRegistry,
+ rmServices.jobLeaderIdService,
+ fatalErrorHandler,
+ mockResourceManagerClient,
+ mockNMClient
+ );
+ }
+
+ /**
+ * Mock services needed by the resource manager.
+ */
+ class MockResourceManagerRuntimeServices {
+
+ public final ScheduledExecutor scheduledExecutor;
+ public final TestingHighAvailabilityServices highAvailabilityServices;
+ public final HeartbeatServices heartbeatServices;
+ public final MetricRegistry metricRegistry;
+ public final TestingLeaderElectionService rmLeaderElectionService;
+ public final JobLeaderIdService jobLeaderIdService;
+ public final SlotManager slotManager;
+
+ public UUID rmLeaderSessionId;
+
+ MockResourceManagerRuntimeServices() throws Exception {
+ scheduledExecutor = mock(ScheduledExecutor.class);
+ highAvailabilityServices = new TestingHighAvailabilityServices();
+ rmLeaderElectionService = new TestingLeaderElectionService();
+ highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+ heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
+ metricRegistry = new NoOpMetricRegistry();
+ slotManager = new SlotManager(
+ new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()),
+ Time.seconds(10), Time.seconds(10), Time.minutes(1));
+ jobLeaderIdService = new JobLeaderIdService(
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor(),
+ Time.minutes(5L));
+ }
+
+ public void grantLeadership() throws Exception {
+ rmLeaderSessionId = UUID.randomUUID();
+ rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Start the resource manager and grant leadership to it.
+ */
+ public void startResourceManager() throws Exception {
+ resourceManager.start();
+ rmServices.grantLeadership();
+ }
+
+ /**
+ * Stop the Akka actor system.
+ */
+ public void stopResourceManager() throws Exception {
+ rpcService.stopService();
+ }
+ }
+
+ static class TestContainer extends UtilsTest.TestingContainer {
+ Resource resource;
+ Priority priority;
+
+ TestContainer(String host, int port, int containerId) {
+ super(host, port, containerId);
+ }
+
+ @Override
+ public Resource getResource() {
+ return resource;
+ }
+
+ @Override
+ public void setResource(Resource resource) {
+ this.resource = resource;
+ }
+
+ @Override
+ public Priority getPriority() {
+ return priority;
+ }
+
+ @Override
+ public void setPriority(Priority priority) {
+ this.priority = priority;
+ }
+ }
+
+ @Test
+ public void testStopWorker() throws Exception {
+ new Context() {{
+ startResourceManager();
+ // Request slot from SlotManager.
+ resourceManager.runInMainThread(() -> {
+ try {
+ rmServices.slotManager.registerSlotRequest(
+ new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+ } catch (SlotManagerException e) {
+ log.error("registerSlotRequest: {}", e);
+ fail("registerSlotRequest should not throw exception.");
+ }
+ });
+
+ // Callback from YARN when container is allocated.
+ Container testingContainer = new TestContainer(taskHost, 1234, 1);
+ testingContainer.setResource(Resource.newInstance(200, 1));
+ testingContainer.setPriority(Priority.UNDEFINED);
+ resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+ verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+ verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
+
+ // Remote task executor registers with YarnResourceManager.
+ TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
+ rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
+ final ResourceManagerGateway rmGateway =
+ resourceManager.getSelfGateway(ResourceManagerGateway.class);
+ rmGateway.registerTaskExecutor(taskHost,
+ new ResourceID(testingContainer.getId().toString()),
+ slotReport,
+ dataPort,
+ hardwareDescription,
+ Time.seconds(10)).handleAsync(
+ (RegistrationResponse response, Throwable throwable) -> {
+ assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 1);
+ return null;
+ }, resourceManager.getMainThreadExecutorForTesting());
+
+ // Unregister all task executors and release all containers.
+ resourceManager.runInMainThread(() -> {
+ rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
+ try {
+ verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+ } catch (Exception e) {
+ fail("stopContainer() should not throw exception.");
+ }
+ verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+ });
+ final CompletableFuture<Void> barrier = new CompletableFuture<>();
+ // Wait for all above operations to complete.
+ resourceManager.runInMainThread(() -> barrier.complete(null));
+ barrier.get();
+ stopResourceManager();
+
+ // It's now safe to access the SlotManager state since the ResourceManager has been stopped.
+ assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0);
+ assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
+ }};
+ }
+}
[2/2] flink git commit: [FLINK-7076] [tests] Harden
YarnResourcemanagerTest#testStopWorker to properly wait for concurrent
operations
Posted by tr...@apache.org.
[FLINK-7076] [tests] Harden YarnResourcemanagerTest#testStopWorker to properly wait for concurrent operations
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd532516
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd532516
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd532516
Branch: refs/heads/master
Commit: cd532516189b801bb02650665cbb109f8d8f8887
Parents: 59e3b01
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 8 13:29:00 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 8 15:51:17 2017 +0100
----------------------------------------------------------------------
.../apache/flink/yarn/YarnResourceManager.java | 4 +-
.../flink/yarn/YarnResourceManagerTest.java | 77 +++++++++++---------
2 files changed, 44 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cd532516/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 7fa0e30..c900c83 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -245,7 +245,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
public boolean stopWorker(YarnWorkerNode workerNode) {
if (workerNode != null) {
Container container = workerNode.getContainer();
- log.info("Stopping container {}.", container.getId().toString());
+ log.info("Stopping container {}.", container.getId());
// release the container on the node manager
try {
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
@@ -255,7 +255,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
resourceManagerClient.releaseAssignedContainer(container.getId());
workerNodeMap.remove(workerNode.getResourceID());
} else {
- log.error("Can not find container with resource ID {}.", workerNode.getResourceID().toString());
+ log.error("Can not find container with resource ID {}.", workerNode.getResourceID());
}
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd532516/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 808bc2b..18a358f 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -41,15 +42,14 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
@@ -77,6 +77,7 @@ import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -86,8 +87,8 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -156,8 +157,8 @@ public class YarnResourceManagerTest extends TestLogger {
this.mockResourceManagerClient = mockResourceManagerClient;
}
- public void runInMainThread(Runnable runnable) {
- super.getMainThreadExecutor().execute(runnable);
+ public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
+ return callAsync(callable, timeout);
}
public MainThreadExecutor getMainThreadExecutorForTesting() {
@@ -198,8 +199,6 @@ public class YarnResourceManagerTest extends TestLogger {
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1);
public String taskHost = "host1";
- SlotReport slotReport = new SlotReport();
-
public NMClient mockNMClient = mock(NMClient.class);
public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient =
mock(AMRMClientAsync.class);
@@ -323,16 +322,15 @@ public class YarnResourceManagerTest extends TestLogger {
new Context() {{
startResourceManager();
// Request slot from SlotManager.
- resourceManager.runInMainThread(() -> {
- try {
- rmServices.slotManager.registerSlotRequest(
- new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
- } catch (SlotManagerException e) {
- log.error("registerSlotRequest: {}", e);
- fail("registerSlotRequest should not throw exception.");
- }
+ CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+ rmServices.slotManager.registerSlotRequest(
+ new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+ return null;
});
+ // wait for the registerSlotRequest completion
+ registerSlotRequestFuture.get();
+
// Callback from YARN when container is allocated.
Container testingContainer = new TestContainer(taskHost, 1234, 1);
testingContainer.setResource(Resource.newInstance(200, 1));
@@ -344,33 +342,42 @@ public class YarnResourceManagerTest extends TestLogger {
// Remote task executor registers with YarnResourceManager.
TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
- final ResourceManagerGateway rmGateway =
- resourceManager.getSelfGateway(ResourceManagerGateway.class);
- rmGateway.registerTaskExecutor(taskHost,
- new ResourceID(testingContainer.getId().toString()),
+
+ final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+ final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
+ final SlotReport slotReport = new SlotReport(
+ new SlotStatus(
+ new SlotID(taskManagerResourceId, 1),
+ new ResourceProfile(10, 1, 1, 1)));
+
+ CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
+ .registerTaskExecutor(
+ taskHost,
+ taskManagerResourceId,
slotReport,
dataPort,
hardwareDescription,
- Time.seconds(10)).handleAsync(
- (RegistrationResponse response, Throwable throwable) -> {
- assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 1);
- return null;
- }, resourceManager.getMainThreadExecutorForTesting());
+ Time.seconds(10L))
+ .handleAsync(
+ (RegistrationResponse response, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
+ resourceManager.getMainThreadExecutorForTesting());
+
+ final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
+
+ assertEquals(1, numberRegisteredSlots);
// Unregister all task executors and release all containers.
- resourceManager.runInMainThread(() -> {
+ CompletableFuture<?> unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> {
rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
- try {
- verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
- } catch (Exception e) {
- fail("stopContainer() should not throw exception.");
- }
- verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+ return null;
});
- final CompletableFuture<Void> barrier = new CompletableFuture<>();
- // Wait for all above operations to complete.
- resourceManager.runInMainThread(() -> barrier.complete(null));
- barrier.get();
+
+ unregisterAndReleaseFuture.get();
+
+ verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+ verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+
stopResourceManager();
// It's now safe to access the SlotManager state since the ResourceManager has been stopped.