You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 21:55:17 UTC

flink git commit: [FLINK-4927] [yarn] refine YARN Resource manager according to till's comments

Repository: flink
Updated Branches:
  refs/heads/flip-6 55e94c3c6 -> 8c448e8f5


[FLINK-4927] [yarn] refine YARN Resource manager according to till's comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c448e8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c448e8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c448e8f

Branch: refs/heads/flip-6
Commit: 8c448e8f5d091258a2eec5348f879007ac72e288
Parents: 55e94c3
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Mon Dec 5 15:36:16 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 19:10:35 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/yarn/YarnResourceManager.java  | 68 +++++++++++++++-----
 1 file changed, 51 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c448e8f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6280bdf..9b9ea39 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -79,21 +80,24 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	/** The process environment variables */
 	private final Map<String, String> ENV;
 
+	/** The default registration timeout for task executor in seconds. */
+	private final static int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300;
+
 	/** The heartbeat interval while the resource master is waiting for containers */
 	private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
 
 	/** The default heartbeat interval during regular operation */
 	private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
 
-	/** The maximum time that TaskExecutors may be waiting to register at the ResourceManager before they quit */
-	private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
+	/** The default memory of task executor to allocate (in MB) */
+	private static final int DEFAULT_TSK_EXECUTOR_MEMORY_SIZE = 1024;
 
 	/** Environment variable name of the final container id used by the YarnResourceManager.
 	 * Container ID generation may vary across Hadoop versions. */
 	final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
 	
-	/** Environment variable name of the hostname used by the Yarn.
-	 * TaskExecutor use this host name to start port. */
+	/** Environment variable name of the hostname given by the YARN.
+	 * In task executor we use the hostnames given by YARN consistently throughout akka */
 	final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
 	/** Default heartbeat interval between this resource manager and the YARN ResourceManager */
@@ -112,6 +116,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	/** The number of containers requested, but not yet granted */
 	private int numPendingContainerRequests;
 
+	final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
+
 	public YarnResourceManager(
 			Configuration flinkConfig,
 			Map<String, String> env,
@@ -173,20 +179,28 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	@Override
 	public void shutDown() throws Exception {
 		// shut down all components
+		Throwable firstException = null;
 		if (resourceManagerClient != null) {
 			try {
 				resourceManagerClient.stop();
 			} catch (Throwable t) {
-				LOG.error("Could not cleanly shut down the Asynchronous Resource Manager Client", t);
+				firstException = t;
 			}
 		}
 		if (nodeManagerClient != null) {
 			try {
 				nodeManagerClient.stop();
 			} catch (Throwable t) {
-				LOG.error("Could not cleanly shut down the Node Manager Client", t);
+				if (firstException == null) {
+					firstException = t;
+				} else {
+					firstException.addSuppressed(t);
+				}
 			}
 		}
+		if (firstException != null) {
+			ExceptionUtils.rethrowException(firstException, "Error while shutting down YARN resource manager");
+		}
 		super.shutDown();
 	}
 
@@ -207,13 +221,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	public void startNewWorker(ResourceProfile resourceProfile) {
 		// Priority for worker containers - priorities are intra-application
 		//TODO: set priority according to the resource allocated
-		Priority priority = Priority.newInstance(0);
-		int mem = resourceProfile.getMemoryInMB() <= Integer.MAX_VALUE ? (int)resourceProfile.getMemoryInMB() : Integer.MAX_VALUE;
-		if (mem < 0) {
-			mem = 1024;
-		}
-		int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores() + 1;
-		Resource capability = Resource.newInstance(mem , vcore);
+		Priority priority = Priority.newInstance(generatePriority(resourceProfile));
+		int mem = resourceProfile.getMemoryInMB() < 0 ? DEFAULT_TSK_EXECUTOR_MEMORY_SIZE : (int)resourceProfile.getMemoryInMB();
+		int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores();
+		Resource capability = Resource.newInstance(mem, vcore);
 		requestYarnContainer(capability, priority);
 	}
 
@@ -234,7 +245,6 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 		for (ContainerStatus container : list) {
 			if (container.getExitStatus() < 0) {
 				notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics());
-				// TODO: notice job master slot fail
 			}
 		}
 	}
@@ -253,7 +263,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			}
 			catch (Throwable t) {
 				// failed to launch the container, will release the failed one and ask for a new one
-				LOG.error("Could not start TaskManager in container " + container, t);
+				LOG.error("Could not start TaskManager in container {},", container, t);
 				resourceManagerClient.releaseAssignedContainer(container.getId());
 				requestYarnContainer(container.getResource(), container.getPriority());
 			}
@@ -265,7 +275,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 
 	@Override
 	public void onShutdownRequest() {
-		// Nothing to do
+		try {
+			shutDown();
+		} catch (Exception e) {
+			LOG.warn("Fail to shutdown the YARN resource manager.", e);
+		}
 	}
 
 	@Override
@@ -336,8 +350,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 				taskManagerParameters.taskManagerTotalMemoryMB(),
 				taskManagerParameters.taskManagerHeapSizeMB(),
 				taskManagerParameters.taskManagerDirectMemoryLimitMB());
+		int timeout = flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, 
+				DEFAULT_TASK_MANAGER_REGISTRATION_DURATION);
+		FiniteDuration teRegistrationTimeout = new FiniteDuration(timeout, TimeUnit.SECONDS);
 		final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
-				flinkConfig, "", 0, 1, TASKEXECUTOR_REGISTRATION_TIMEOUT);
+				flinkConfig, "", 0, 1, teRegistrationTimeout);
 		LOG.debug("TaskManager configuration: {}", taskManagerConfig);
 
 		ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext(
@@ -549,4 +566,21 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 
 		return ctx;
 	}
+	
+	/**
+	 * Generate priority by given resouce profile. 
+	 * Priority is only used for distinguishing request of different resource.
+	 * @param resourceProfile The resource profile of a request
+	 * @return The priority of this resource profile.
+	 */
+	private int generatePriority(ResourceProfile resourceProfile) {
+		if (resourcePriorities.containsKey(resourceProfile)) {
+			return resourcePriorities.get(resourceProfile);
+		} else {
+			int priority = resourcePriorities.size();
+			resourcePriorities.put(resourceProfile, priority);
+			return priority;
+		}
+	}
+
 }