You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:06 UTC
[47/52] [abbrv] flink git commit: [FLINK-4927] [yarn] refine YARN
Resource manager according to till's comments
[FLINK-4927] [yarn] refine YARN Resource manager according to till's comments
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2922add
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2922add
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2922add
Branch: refs/heads/master
Commit: e2922add100338776db765a62deb02f556845cf9
Parents: 371997a
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Mon Dec 5 15:36:16 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:27 2016 +0100
----------------------------------------------------------------------
.../apache/flink/yarn/YarnResourceManager.java | 68 +++++++++++++++-----
1 file changed, 51 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e2922add/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6280bdf..9b9ea39 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
+import org.apache.flink.util.ExceptionUtils;
import java.io.File;
import java.io.IOException;
@@ -79,21 +80,24 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
/** The process environment variables */
private final Map<String, String> ENV;
+ /** The default registration timeout for task executor in seconds. */
+ private final static int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300;
+
/** The heartbeat interval while the resource master is waiting for containers */
private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
/** The default heartbeat interval during regular operation */
private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
- /** The maximum time that TaskExecutors may be waiting to register at the ResourceManager before they quit */
- private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
+ /** The default memory of task executor to allocate (in MB) */
+ private static final int DEFAULT_TSK_EXECUTOR_MEMORY_SIZE = 1024;
/** Environment variable name of the final container id used by the YarnResourceManager.
* Container ID generation may vary across Hadoop versions. */
final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
- /** Environment variable name of the hostname used by the Yarn.
- * TaskExecutor use this host name to start port. */
+ /** Environment variable name of the hostname given by the YARN.
+ * In task executor we use the hostnames given by YARN consistently throughout akka */
final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
/** Default heartbeat interval between this resource manager and the YARN ResourceManager */
@@ -112,6 +116,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
/** The number of containers requested, but not yet granted */
private int numPendingContainerRequests;
+ final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
+
public YarnResourceManager(
Configuration flinkConfig,
Map<String, String> env,
@@ -173,20 +179,28 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
@Override
public void shutDown() throws Exception {
// shut down all components
+ Throwable firstException = null;
if (resourceManagerClient != null) {
try {
resourceManagerClient.stop();
} catch (Throwable t) {
- LOG.error("Could not cleanly shut down the Asynchronous Resource Manager Client", t);
+ firstException = t;
}
}
if (nodeManagerClient != null) {
try {
nodeManagerClient.stop();
} catch (Throwable t) {
- LOG.error("Could not cleanly shut down the Node Manager Client", t);
+ if (firstException == null) {
+ firstException = t;
+ } else {
+ firstException.addSuppressed(t);
+ }
}
}
+ if (firstException != null) {
+ ExceptionUtils.rethrowException(firstException, "Error while shutting down YARN resource manager");
+ }
super.shutDown();
}
@@ -207,13 +221,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
public void startNewWorker(ResourceProfile resourceProfile) {
// Priority for worker containers - priorities are intra-application
//TODO: set priority according to the resource allocated
- Priority priority = Priority.newInstance(0);
- int mem = resourceProfile.getMemoryInMB() <= Integer.MAX_VALUE ? (int)resourceProfile.getMemoryInMB() : Integer.MAX_VALUE;
- if (mem < 0) {
- mem = 1024;
- }
- int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores() + 1;
- Resource capability = Resource.newInstance(mem , vcore);
+ Priority priority = Priority.newInstance(generatePriority(resourceProfile));
+ int mem = resourceProfile.getMemoryInMB() < 0 ? DEFAULT_TSK_EXECUTOR_MEMORY_SIZE : (int)resourceProfile.getMemoryInMB();
+ int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores();
+ Resource capability = Resource.newInstance(mem, vcore);
requestYarnContainer(capability, priority);
}
@@ -234,7 +245,6 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
for (ContainerStatus container : list) {
if (container.getExitStatus() < 0) {
notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics());
- // TODO: notice job master slot fail
}
}
}
@@ -253,7 +263,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
}
catch (Throwable t) {
// failed to launch the container, will release the failed one and ask for a new one
- LOG.error("Could not start TaskManager in container " + container, t);
+ LOG.error("Could not start TaskManager in container {},", container, t);
resourceManagerClient.releaseAssignedContainer(container.getId());
requestYarnContainer(container.getResource(), container.getPriority());
}
@@ -265,7 +275,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
@Override
public void onShutdownRequest() {
- // Nothing to do
+ try {
+ shutDown();
+ } catch (Exception e) {
+ LOG.warn("Fail to shutdown the YARN resource manager.", e);
+ }
}
@Override
@@ -336,8 +350,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
taskManagerParameters.taskManagerTotalMemoryMB(),
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());
+ int timeout = flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+ DEFAULT_TASK_MANAGER_REGISTRATION_DURATION);
+ FiniteDuration teRegistrationTimeout = new FiniteDuration(timeout, TimeUnit.SECONDS);
final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
- flinkConfig, "", 0, 1, TASKEXECUTOR_REGISTRATION_TIMEOUT);
+ flinkConfig, "", 0, 1, teRegistrationTimeout);
LOG.debug("TaskManager configuration: {}", taskManagerConfig);
ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext(
@@ -549,4 +566,21 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
return ctx;
}
+
+ /**
+ * Generate priority by given resouce profile.
+ * Priority is only used for distinguishing request of different resource.
+ * @param resourceProfile The resource profile of a request
+ * @return The priority of this resource profile.
+ */
+ private int generatePriority(ResourceProfile resourceProfile) {
+ if (resourcePriorities.containsKey(resourceProfile)) {
+ return resourcePriorities.get(resourceProfile);
+ } else {
+ int priority = resourcePriorities.size();
+ resourcePriorities.put(resourceProfile, priority);
+ return priority;
+ }
+ }
+
}