You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:50 UTC
[01/10] flink git commit: [FLINK-4928] [yarn] Implement FLIP-6 YARN
Application Master Runner
Repository: flink
Updated Branches:
refs/heads/flip-6 4afcc4abd -> 55e94c3c6
[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e57fba0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e57fba0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e57fba0
Branch: refs/heads/flip-6
Commit: 8e57fba073be139f69e072bdb4888d582fa7211a
Parents: e11ea3f
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Thu Nov 3 16:24:47 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../resourcemanager/ResourceManager.java | 2 +-
.../apache/flink/yarn/YarnResourceManager.java | 552 +++++++++++++++++++
2 files changed, 553 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e57fba0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 76b4a86..3bcbfda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -637,7 +637,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
*
* @param t The exception describing the fatal error
*/
- void onFatalErrorAsync(final Throwable t) {
+ protected void onFatalErrorAsync(final Throwable t) {
runAsync(new Runnable() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/flink/blob/8e57fba0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
new file mode 100644
index 0000000..6280bdf
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
+/**
+ * The yarn implementation of the resource manager. Used when the system is started
+ * via the resource framework YARN.
+ */
+public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ /** The process environment variables */
+ private final Map<String, String> ENV;
+
+ /** The heartbeat interval while the resource master is waiting for containers */
+ private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+ /** The default heartbeat interval during regular operation */
+ private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+ /** The maximum time that TaskExecutors may be waiting to register at the ResourceManager before they quit */
+ private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
+
+ /** Environment variable name of the final container id used by the YarnResourceManager.
+ * Container ID generation may vary across Hadoop versions. */
+ final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+
+ /** Environment variable name of the hostname used by the Yarn.
+ * TaskExecutor use this host name to start port. */
+ final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+
+ /** Default heartbeat interval between this resource manager and the YARN ResourceManager */
+ private final int yarnHeartbeatIntervalMillis;
+
+ private final Configuration flinkConfig;
+
+ private final YarnConfiguration yarnConfig;
+
+ /** Client to communicate with the Resource Manager (YARN's master) */
+ private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
+
+ /** Client to communicate with the Node manager and launch TaskExecutor processes */
+ private NMClient nodeManagerClient;
+
+ /** The number of containers requested, but not yet granted */
+ private int numPendingContainerRequests;
+
+ public YarnResourceManager(
+ Configuration flinkConfig,
+ Map<String, String> env,
+ RpcService rpcService,
+ ResourceManagerConfiguration resourceManagerConfiguration,
+ HighAvailabilityServices highAvailabilityServices,
+ SlotManagerFactory slotManagerFactory,
+ MetricRegistry metricRegistry,
+ JobLeaderIdService jobLeaderIdService,
+ FatalErrorHandler fatalErrorHandler) {
+ super(
+ rpcService,
+ resourceManagerConfiguration,
+ highAvailabilityServices,
+ slotManagerFactory,
+ metricRegistry,
+ jobLeaderIdService,
+ fatalErrorHandler);
+ this.flinkConfig = flinkConfig;
+ this.yarnConfig = new YarnConfiguration();
+ this.ENV = env;
+ final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
+ ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+
+ final long yarnExpiryIntervalMS = yarnConfig.getLong(
+ YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+
+ if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
+ log.warn("The heartbeat interval of the Flink Application master ({}) is greater " +
+ "than YARN's expiry interval ({}). The application is likely to be killed by YARN.",
+ yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
+ }
+ yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
+ numPendingContainerRequests = 0;
+ }
+
+ @Override
+ protected void initialize() throws ResourceManagerException {
+ resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
+ resourceManagerClient.init(yarnConfig);
+ resourceManagerClient.start();
+ try {
+ //TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
+ Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
+ //TODO: the third paramter should be the webmonitor address
+ resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
+ } catch (Exception e) {
+ LOG.info("registerApplicationMaster fail", e);
+ }
+
+ // create the client to communicate with the node managers
+ nodeManagerClient = NMClient.createNMClient();
+ nodeManagerClient.init(yarnConfig);
+ nodeManagerClient.start();
+ nodeManagerClient.cleanupRunningContainersOnStop(true);
+ }
+
+ @Override
+ public void shutDown() throws Exception {
+ // shut down all components
+ if (resourceManagerClient != null) {
+ try {
+ resourceManagerClient.stop();
+ } catch (Throwable t) {
+ LOG.error("Could not cleanly shut down the Asynchronous Resource Manager Client", t);
+ }
+ }
+ if (nodeManagerClient != null) {
+ try {
+ nodeManagerClient.stop();
+ } catch (Throwable t) {
+ LOG.error("Could not cleanly shut down the Node Manager Client", t);
+ }
+ }
+ super.shutDown();
+ }
+
+ @Override
+ protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+
+ // first, de-register from YARN
+ FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
+ LOG.info("Unregistering application from the YARN Resource Manager");
+ try {
+ resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
+ } catch (Throwable t) {
+ LOG.error("Could not unregister the application master.", t);
+ }
+ }
+
+ @Override
+ public void startNewWorker(ResourceProfile resourceProfile) {
+ // Priority for worker containers - priorities are intra-application
+ //TODO: set priority according to the resource allocated
+ Priority priority = Priority.newInstance(0);
+ int mem = resourceProfile.getMemoryInMB() <= Integer.MAX_VALUE ? (int)resourceProfile.getMemoryInMB() : Integer.MAX_VALUE;
+ if (mem < 0) {
+ mem = 1024;
+ }
+ int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores() + 1;
+ Resource capability = Resource.newInstance(mem , vcore);
+ requestYarnContainer(capability, priority);
+ }
+
+ @Override
+ protected ResourceID workerStarted(ResourceID resourceID) {
+ return resourceID;
+ }
+
+ // AMRMClientAsync CallbackHandler methods
+ @Override
+ public float getProgress() {
+ // Temporarily need not record the total size of asked and allocated containers
+ return 1;
+ }
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> list) {
+ for (ContainerStatus container : list) {
+ if (container.getExitStatus() < 0) {
+ notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics());
+ // TODO: notice job master slot fail
+ }
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ for (Container container : containers) {
+ numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
+ LOG.info("Received new container: {} - Remaining pending container requests: {}",
+ container.getId(), numPendingContainerRequests);
+ try {
+ /** Context information used to start a TaskExecutor Java process */
+ ContainerLaunchContext taskExecutorLaunchContext =
+ createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost());
+ nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+ }
+ catch (Throwable t) {
+ // failed to launch the container, will release the failed one and ask for a new one
+ LOG.error("Could not start TaskManager in container " + container, t);
+ resourceManagerClient.releaseAssignedContainer(container.getId());
+ requestYarnContainer(container.getResource(), container.getPriority());
+ }
+ }
+ if (numPendingContainerRequests <= 0) {
+ resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
+ }
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ // Nothing to do
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> list) {
+ // We are not interested in node updates
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ onFatalErrorAsync(error);
+ }
+
+ //Utility methods
+ /**
+ * Converts a Flink application status enum to a YARN application status enum.
+ * @param status The Flink application status.
+ * @return The corresponding YARN application status.
+ */
+ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
+ if (status == null) {
+ return FinalApplicationStatus.UNDEFINED;
+ }
+ else {
+ switch (status) {
+ case SUCCEEDED:
+ return FinalApplicationStatus.SUCCEEDED;
+ case FAILED:
+ return FinalApplicationStatus.FAILED;
+ case CANCELED:
+ return FinalApplicationStatus.KILLED;
+ default:
+ return FinalApplicationStatus.UNDEFINED;
+ }
+ }
+ }
+
+ // parse the host and port from akka address,
+ // the akka address is like akka.tcp://flink@100.81.153.180:49712/user/$a
+ private static Tuple2<String, Integer> parseHostPort(String address) {
+ String[] hostPort = address.split("@")[1].split(":");
+ String host = hostPort[0];
+ String port = hostPort[1].split("/")[0];
+ return new Tuple2(host, Integer.valueOf(port));
+ }
+
+ private void requestYarnContainer(Resource resource, Priority priority) {
+ resourceManagerClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(resource, null, null, priority));
+ // make sure we transmit the request fast and receive fast news of granted allocations
+ resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+
+ numPendingContainerRequests++;
+ LOG.info("Requesting new TaskManager container pending requests: {}",
+ numPendingContainerRequests);
+ }
+
+ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
+ throws Exception {
+ // init the ContainerLaunchContext
+ final String currDir = ENV.get(ApplicationConstants.Environment.PWD.key());
+
+ final ContaineredTaskManagerParameters taskManagerParameters =
+ ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 1);
+
+ LOG.info("TaskExecutor{} will be started with container size {} MB, JVM heap size {} MB, " +
+ "JVM direct memory limit {} MB",
+ containerId,
+ taskManagerParameters.taskManagerTotalMemoryMB(),
+ taskManagerParameters.taskManagerHeapSizeMB(),
+ taskManagerParameters.taskManagerDirectMemoryLimitMB());
+ final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
+ flinkConfig, "", 0, 1, TASKEXECUTOR_REGISTRATION_TIMEOUT);
+ LOG.debug("TaskManager configuration: {}", taskManagerConfig);
+
+ ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext(
+ flinkConfig, yarnConfig, ENV,
+ taskManagerParameters, taskManagerConfig,
+ currDir, YarnTaskExecutorRunner.class, LOG);
+
+ // set a special environment variable to uniquely identify this container
+ taskExecutorLaunchContext.getEnvironment()
+ .put(ENV_FLINK_CONTAINER_ID, containerId);
+ taskExecutorLaunchContext.getEnvironment()
+ .put(ENV_FLINK_NODE_ID, host);
+ return taskExecutorLaunchContext;
+ }
+
+
+ /**
+ * Creates the launch context, which describes how to bring up a TaskExecutor process in
+ * an allocated YARN container.
+ *
+ * <p>This code is extremely YARN specific and registers all the resources that the TaskExecutor
+ * needs (such as JAR file, config file, ...) and all environment variables in a YARN
+ * container launch context. The launch context then ensures that those resources will be
+ * copied into the containers transient working directory.
+ *
+ * @param flinkConfig
+ * The Flink configuration object.
+ * @param yarnConfig
+ * The YARN configuration object.
+ * @param env
+ * The environment variables.
+ * @param tmParams
+ * The TaskExecutor container memory parameters.
+ * @param taskManagerConfig
+ * The configuration for the TaskExecutors.
+ * @param workingDirectory
+ * The current application master container's working directory.
+ * @param taskManagerMainClass
+ * The class with the main method.
+ * @param log
+ * The logger.
+ *
+ * @return The launch context for the TaskManager processes.
+ *
+ * @throws Exception Thrown if teh launch context could not be created, for example if
+ * the resources could not be copied.
+ */
+ private static ContainerLaunchContext createTaskExecutorContext(
+ Configuration flinkConfig,
+ YarnConfiguration yarnConfig,
+ Map<String, String> env,
+ ContaineredTaskManagerParameters tmParams,
+ Configuration taskManagerConfig,
+ String workingDirectory,
+ Class<?> taskManagerMainClass,
+ Logger log) throws Exception {
+
+ // get and validate all relevant variables
+
+ String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
+
+ String appId = env.get(YarnConfigKeys.ENV_APP_ID);
+
+ String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+
+ String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+
+ String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+ final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
+ log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
+
+ final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+ final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+ log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath);
+
+ final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
+ log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+ String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
+
+ // obtain a handle to the file system used by YARN
+ final org.apache.hadoop.fs.FileSystem yarnFileSystem;
+ try {
+ yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig);
+ } catch (IOException e) {
+ throw new Exception("Could not access YARN's default file system", e);
+ }
+
+ //register keytab
+ LocalResource keytabResource = null;
+ if(remoteKeytabPath != null) {
+ log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
+ keytabResource = Records.newRecord(LocalResource.class);
+ Path keytabPath = new Path(remoteKeytabPath);
+ Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
+ }
+
+ //To support Yarn Secure Integration Test Scenario
+ LocalResource yarnConfResource = null;
+ LocalResource krb5ConfResource = null;
+ boolean hasKrb5 = false;
+ if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+ log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
+ yarnConfResource = Records.newRecord(LocalResource.class);
+ Path yarnConfPath = new Path(remoteYarnConfPath);
+ Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
+
+ log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
+ krb5ConfResource = Records.newRecord(LocalResource.class);
+ Path krb5ConfPath = new Path(remoteKrb5Path);
+ Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
+
+ hasKrb5 = true;
+ }
+
+ // register Flink Jar with remote HDFS
+ LocalResource flinkJar = Records.newRecord(LocalResource.class);
+ {
+ Path remoteJarPath = new Path(remoteFlinkJarPath);
+ Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar);
+ }
+
+ // register conf with local fs
+ LocalResource flinkConf = Records.newRecord(LocalResource.class);
+ {
+ // write the TaskManager configuration to a local file
+ final File taskManagerConfigFile =
+ new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
+ log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
+ BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
+
+ Utils.setupLocalResource(yarnFileSystem, appId,
+ new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+ log.info("Prepared local resource for modified yaml: {}", flinkConf);
+ }
+
+ Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
+ taskManagerLocalResources.put("flink.jar", flinkJar);
+ taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+ //To support Yarn Secure Integration Test Scenario
+ if(yarnConfResource != null && krb5ConfResource != null) {
+ taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+ taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+ }
+
+ if(keytabResource != null) {
+ taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+ }
+
+ // prepare additional files to be shipped
+ for (String pathStr : shipListString.split(",")) {
+ if (!pathStr.isEmpty()) {
+ LocalResource resource = Records.newRecord(LocalResource.class);
+ Path path = new Path(pathStr);
+ Utils.registerLocalResource(yarnFileSystem, path, resource);
+ taskManagerLocalResources.put(path.getName(), resource);
+ }
+ }
+
+ // now that all resources are prepared, we can create the launch context
+
+ log.info("Creating container launch context for TaskManagers");
+
+ boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
+ boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
+
+ String launchCommand = BootstrapTools.getTaskManagerShellCommand(
+ flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+ hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
+
+ log.info("Starting TaskManagers with command: " + launchCommand);
+
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ ctx.setCommands(Collections.singletonList(launchCommand));
+ ctx.setLocalResources(taskManagerLocalResources);
+
+ Map<String, String> containerEnv = new HashMap<>();
+ containerEnv.putAll(tmParams.taskManagerEnv());
+
+ // add YARN classpath, etc to the container environment
+ containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
+ Utils.setupYarnClassPath(yarnConfig, containerEnv);
+
+ containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+ if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+ containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
+ containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
+ }
+
+ ctx.setEnvironment(containerEnv);
+
+ try (DataOutputBuffer dob = new DataOutputBuffer()) {
+ log.debug("Adding security tokens to Task Executor Container launch Context....");
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+ Credentials credentials = user.getCredentials();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ ctx.setTokens(securityTokens);
+ }
+ catch (Throwable t) {
+ log.error("Getting current user info failed when trying to launch the container", t);
+ }
+
+ return ctx;
+ }
+}
[07/10] flink git commit: [FLINK-5239] [distributed coordination] RPC
service properly unpacks 'InvocationTargetExceptions'
Posted by se...@apache.org.
[FLINK-5239] [distributed coordination] RPC service properly unpacks 'InvocationTargetExceptions'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/887cbb90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/887cbb90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/887cbb90
Branch: refs/heads/flip-6
Commit: 887cbb9095af92e4788c06ba0307cc9db5c5b948
Parents: 44fc46d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 18:49:21 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 16 +++-
.../runtime/rpc/akka/AkkaRpcActorTest.java | 89 +++++++++++++++++++-
2 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index fe6b23b..264ba96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -180,8 +181,19 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
// No return value to send back
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
- } else {
- Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+ }
+ else {
+ final Object result;
+ try {
+ result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+ }
+ catch (InvocationTargetException e) {
+ LOG.trace("Reporting back error thrown in remote procedure {}", rpcMethod, e);
+
+ // tell the sender about the failure
+ getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
+ return;
+ }
if (result instanceof Future) {
final Future<?> future = (Future<?>) result;
http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 760e1a7..c73240c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.TestLogger;
+
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Test;
@@ -86,7 +88,7 @@ public class AkkaRpcActorTest extends TestLogger {
Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
try {
- DummyRpcGateway gateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
+ futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
fail("The rpc connection resolution should have failed.");
} catch (ExecutionException exception) {
@@ -192,6 +194,48 @@ public class AkkaRpcActorTest extends TestLogger {
terminationFuture.get();
}
+ @Test
+ public void testExceptionPropagation() throws Exception {
+ ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint(akkaRpcService);
+ rpcEndpoint.start();
+
+ ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+ Future<Integer> result = rpcGateway.doStuff();
+
+ try {
+ result.get(timeout.getSize(), timeout.getUnit());
+ fail("this should fail with an exception");
+ }
+ catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertEquals(RuntimeException.class, cause.getClass());
+ assertEquals("my super specific test exception", cause.getMessage());
+ }
+ }
+
+ @Test
+ public void testExceptionPropagationFuturePiping() throws Exception {
+ ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint(akkaRpcService);
+ rpcEndpoint.start();
+
+ ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+ Future<Integer> result = rpcGateway.doStuff();
+
+ try {
+ result.get(timeout.getSize(), timeout.getUnit());
+ fail("this should fail with an exception");
+ }
+ catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertEquals(Exception.class, cause.getClass());
+ assertEquals("some test", cause.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Test Actors and Interfaces
+ // ------------------------------------------------------------------------
+
private interface DummyRpcGateway extends RpcGateway {
Future<Integer> foobar();
}
@@ -218,4 +262,47 @@ public class AkkaRpcActorTest extends TestLogger {
_foobar = value;
}
}
+
+ // ------------------------------------------------------------------------
+
+ private interface ExceptionalGateway extends RpcGateway {
+ Future<Integer> doStuff();
+ }
+
+ private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> {
+
+ protected ExceptionalEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @RpcMethod
+ public int doStuff() {
+ throw new RuntimeException("my super specific test exception");
+ }
+ }
+
+ private static class ExceptionalFutureEndpoint extends RpcEndpoint<ExceptionalGateway> {
+
+ protected ExceptionalFutureEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @RpcMethod
+ public Future<Integer> doStuff() {
+ final FlinkCompletableFuture<Integer> future = new FlinkCompletableFuture<>();
+
+ // complete the future slightly in the, well, future...
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {}
+ future.completeExceptionally(new Exception("some test"));
+ }
+ }.start();
+
+ return future;
+ }
+ }
}
[06/10] flink git commit: [FLINK-5190] [runtime] fix
ZooKeeperLeaderRetrievalService close the zk client when stopping bug
Posted by se...@apache.org.
[FLINK-5190] [runtime] fix ZooKeeperLeaderRetrievalService close the zk client when stopping bug
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e11ea3f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e11ea3f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e11ea3f5
Branch: refs/heads/flip-6
Commit: e11ea3f52fc1e1676bfdbb8171c804cfa184f2b0
Parents: 887cbb9
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Wed Nov 30 17:02:49 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e11ea3f5/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
index 4587bad..f74fb1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
@@ -96,7 +96,6 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
client.getConnectionStateListenable().removeListener(connectionStateListener);
cache.close();
- client.close();
}
@Override
[03/10] flink git commit: [FLINK-5238] [minicluster] MiniCluster
starts local communication if only one TaskManager is used
Posted by se...@apache.org.
[FLINK-5238] [minicluster] MiniCluster starts local communication if only one TaskManager is used
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62e8e33f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62e8e33f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62e8e33f
Branch: refs/heads/flip-6
Commit: 62e8e33f341e95b70e090a6d0f7d5e75b9c4d4c9
Parents: 6b3283e
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 17:00:25 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 4 +++-
.../runtime/taskexecutor/TaskManagerRunner.java | 22 +++++++++++++++-----
2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1b9f265..29a6e59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -525,6 +525,7 @@ public class MiniCluster {
RpcService[] taskManagerRpcServices) throws Exception {
final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers];
+ final boolean localCommunication = numTaskManagers == 1;
for (int i = 0; i < numTaskManagers; i++) {
taskManagerRunners[i] = new TaskManagerRunner(
@@ -532,7 +533,8 @@ public class MiniCluster {
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServices[i],
haServices,
- metricRegistry);
+ metricRegistry,
+ localCommunication);
taskManagerRunners[i].start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a18ff40..1145a46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -66,11 +66,22 @@ public class TaskManagerRunner implements FatalErrorHandler {
private final TaskExecutor taskManager;
public TaskManagerRunner(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ MetricRegistry metricRegistry) throws Exception {
+
+ this(configuration, resourceID, rpcService, highAvailabilityServices, metricRegistry, false);
+ }
+
+ public TaskManagerRunner(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
- MetricRegistry metricRegistry) throws Exception {
+ MetricRegistry metricRegistry,
+ boolean localCommunicationOnly) throws Exception {
this.configuration = Preconditions.checkNotNull(configuration);
this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -80,10 +91,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
- TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(
- configuration,
- remoteAddress,
- false);
+ TaskManagerServicesConfiguration taskManagerServicesConfiguration =
+ TaskManagerServicesConfiguration.fromConfiguration(
+ configuration,
+ remoteAddress,
+ localCommunicationOnly);
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
[08/10] flink git commit: [FLINK-5140] [JobManager] SlotPool accepts
allocation requests while ResourceManager is not connected
Posted by se...@apache.org.
[FLINK-5140] [JobManager] SlotPool accepts allocation requests while ResourceManager is not connected
The requests are kept for a certain time and fulfilled once the ResourceManager is connected.
If no ResourceManager is connected in time, the allocation requests are failed.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b3283ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b3283ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b3283ec
Branch: refs/heads/flip-6
Commit: 6b3283ecd980e3db5d5b6cca86885d0dfad6e2cd
Parents: 82c1fcf
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 16:17:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/instance/SlotPool.java | 76 ++++++++++++--
.../flink/runtime/instance/SlotPoolRpcTest.java | 101 +++++++++++++++++++
.../flink/runtime/instance/SlotPoolTest.java | 27 -----
3 files changed, 166 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 65a5c45..1a2adfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -93,8 +93,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
// ------------------------------------------------------------------------
- private final Object lock = new Object();
-
private final JobID jobId;
private final ProviderAndOwner providerAndOwner;
@@ -111,6 +109,9 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
/** All pending requests waiting for slots */
private final HashMap<AllocationID, PendingRequest> pendingRequests;
+ /** The requests that are waiting for the resource manager to be connected */
+ private final HashMap<AllocationID, PendingRequest> waitingForResourceManager;
+
/** Timeout for request calls to the ResourceManager */
private final Time resourceManagerRequestsTimeout;
@@ -154,6 +155,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
this.allocatedSlots = new AllocatedSlots();
this.availableSlots = new AvailableSlots();
this.pendingRequests = new HashMap<>();
+ this.waitingForResourceManager = new HashMap<>();
this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout);
}
@@ -233,6 +235,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
+
+ // work on all slots waiting for this connection
+ for (PendingRequest pending : waitingForResourceManager.values()) {
+ requestSlotFromResourceManager(pending.allocationID(), pending.future(), pending.resourceProfile());
+ }
+
+ // all sent off
+ waitingForResourceManager.clear();
}
@RpcMethod
@@ -273,16 +283,27 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
return FlinkCompletableFuture.completed(slot);
}
- // (2) no slot available, and no resource manager connection
+ // the request will be completed by a future
+ final AllocationID allocationID = new AllocationID();
+ final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+
+ // (2) need to request a slot
+
if (resourceManagerGateway == null) {
- return FlinkCompletableFuture.completedExceptionally(
- new NoResourceAvailableException("not connected to ResourceManager and no slot available"));
-
+ // no slot available, and no resource manager connection
+ stashRequestWaitingForResourceManager(allocationID, resources, future);
+ } else {
+ // we have a resource manager connection, so let's ask it for more resources
+ requestSlotFromResourceManager(allocationID, future, resources);
}
- // (3) we have a resource manager connection, so let's ask it for more resources
- final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
- final AllocationID allocationID = new AllocationID();
+ return future;
+ }
+
+ private void requestSlotFromResourceManager(
+ final AllocationID allocationID,
+ final FlinkCompletableFuture<SimpleSlot> future,
+ final ResourceProfile resources) {
LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID);
@@ -327,8 +348,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
return null;
}
}, getMainThreadExecutor());
-
- return future;
}
private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) {
@@ -357,6 +376,32 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
}
}
+ private void stashRequestWaitingForResourceManager(
+ final AllocationID allocationID,
+ final ResourceProfile resources,
+ final FlinkCompletableFuture<SimpleSlot> future) {
+
+ LOG.info("Cannot serve slot request, no ResourceManager connected. " +
+ "Adding as pending request {}", allocationID);
+
+ waitingForResourceManager.put(allocationID, new PendingRequest(allocationID, future, resources));
+
+ scheduleRunAsync(new Runnable() {
+ @Override
+ public void run() {
+ checkTimeoutRequestWaitingForResourceManager(allocationID);
+ }
+ }, resourceManagerRequestsTimeout);
+ }
+
+ private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
+ PendingRequest request = waitingForResourceManager.remove(allocationID);
+ if (request != null && !request.future().isDone()) {
+ request.future().completeExceptionally(new NoResourceAvailableException(
+ "No slot available and no connection to Resource Manager established."));
+ }
+ }
+
// ------------------------------------------------------------------------
// Slot releasing & offering
// ------------------------------------------------------------------------
@@ -401,6 +446,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
final ResourceProfile slotResources = slot.getResourceProfile();
+ // try the requests sent to the resource manager first
for (PendingRequest request : pendingRequests.values()) {
if (slotResources.isMatching(request.resourceProfile())) {
pendingRequests.remove(request.allocationID());
@@ -408,6 +454,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
}
}
+ // try the requests waiting for a resource manager connection next
+ for (PendingRequest request : waitingForResourceManager.values()) {
+ if (slotResources.isMatching(request.resourceProfile())) {
+ waitingForResourceManager.remove(request.allocationID());
+ return request;
+ }
+ }
+
// no request pending, or no request matches
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
new file mode 100644
index 0000000..89fd22f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SlotPool using a proper RPC setup.
+ */
+public class SlotPoolRpcTest {
+
+ private static RpcService rpcService;
+
+ // ------------------------------------------------------------------------
+ // setup
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void setup() {
+ ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+ rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ rpcService.stopService();
+ }
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testSlotAllocationNoResourceManager() throws Exception {
+ final JobID jid = new JobID();
+
+ final SlotPool pool = new SlotPool(
+ rpcService, jid,
+ SystemClock.getInstance(),
+ Time.days(1), Time.days(1),
+ Time.milliseconds(100) // this is the timeout for the request tested here
+ );
+ pool.start(UUID.randomUUID());
+
+ Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+
+ try {
+ future.get(4, TimeUnit.SECONDS);
+ fail("We expected a ExecutionException.");
+ }
+ catch (ExecutionException e) {
+ assertEquals(NoResourceAvailableException.class, e.getCause().getClass());
+ }
+ catch (TimeoutException e) {
+ fail("future timed out rather than being failed");
+ }
+ catch (Exception e) {
+ fail("wrong exception: " + e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 5fa7af3..97457e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
@@ -42,17 +41,13 @@ import org.mockito.ArgumentCaptor;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -123,28 +118,6 @@ public class SlotPoolTest extends TestLogger {
}
@Test
- public void testAllocateSlotWithoutResourceManager() throws Exception {
- slotPool.disconnectResourceManager();
- Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
- future.handleAsync(
- new BiFunction<SimpleSlot, Throwable, Void>() {
- @Override
- public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
- assertNull(simpleSlot);
- assertNotNull(throwable);
- return null;
- }
- },
- rpcService.getExecutor());
- try {
- future.get(1, TimeUnit.SECONDS);
- fail("We expected a ExecutionException.");
- } catch (ExecutionException ex) {
- // we expect the exception
- }
- }
-
- @Test
public void testAllocationFulfilledByReturnedSlot() throws Exception {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
[02/10] flink git commit: [hotfix] Improve logging and thread
characteristics for 'EmbeddedNonHaServices'
Posted by se...@apache.org.
[hotfix] Improve logging and thread characteristics for 'EmbeddedNonHaServices'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fc46db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fc46db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fc46db
Branch: refs/heads/flip-6
Commit: 44fc46dba0dcf91ee0f430f1e37f9f28e49ebbc2
Parents: 62e8e33
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 17:43:10 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../runtime/highavailability/EmbeddedNonHaServices.java | 7 +++++--
.../highavailability/nonha/AbstractNonHaServices.java | 9 +++++++--
.../highavailability/nonha/EmbeddedLeaderService.java | 5 ++++-
.../src/test/resources/log4j-test.properties | 2 +-
4 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index 523218e..b91cec1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -56,7 +56,10 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
@Override
public void shutdown() throws Exception {
- super.shutdown();
- resourceManagerLeaderService.shutdown();
+ try {
+ super.shutdown();
+ } finally {
+ resourceManagerLeaderService.shutdown();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 237727f..474faa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -55,7 +55,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
- private final RunningJobsRegistry runningJobsRegistry;
+ private final NonHaRegistry runningJobsRegistry;
private boolean shutdown;
@@ -167,8 +167,13 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
@Override
public Thread newThread(@Nonnull Runnable r) {
- Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet());
+ Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
+
+ // HA threads should have a very high priority, but not
+ // keep the JVM running by themselves
+ thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(true);
+
return thread;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
index 84ac551..9fad9be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -225,7 +225,7 @@ public class EmbeddedLeaderService {
// check if the confirmation is for the same grant, or whether it is a stale grant
if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
final String address = service.contender.getAddress();
- LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId);
+ LOG.info("Received confirmation of leadership for leader {} , session={}", address, leaderSessionId);
// mark leadership
currentLeaderConfirmed = service;
@@ -271,6 +271,9 @@ public class EmbeddedLeaderService {
currentLeaderSessionId = leaderSessionId;
currentLeaderProposed = leaderService;
+ LOG.info("Proposing leadership to contender {} @ {}",
+ leaderService.contender, leaderService.contender.getAddress());
+
notificationExecutor.execute(
new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-streaming-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/log4j-test.properties b/flink-streaming-java/src/test/resources/log4j-test.properties
index 881dc06..e7cd3e0 100644
--- a/flink-streaming-java/src/test/resources/log4j-test.properties
+++ b/flink-streaming-java/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.A1.layout.ConversionPattern=%-5r [%-38t] %-5p %-60c %x - %m%n
[05/10] flink git commit: [FLINK-5141] [runtime] Add
'waitUntilTaskManagerRegistrationsComplete()' to MiniCluster
Posted by se...@apache.org.
[FLINK-5141] [runtime] Add 'waitUntilTaskManagerRegistrationsComplete()' to MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82c1fcfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82c1fcfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82c1fcfa
Branch: refs/heads/flip-6
Commit: 82c1fcfa1f34b963f45146830d51b1490b0dc1e3
Parents: c0086b5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 17:35:47 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../leaderelection/LeaderAddressAndId.java | 73 +++++++++++++++++
.../flink/runtime/minicluster/MiniCluster.java | 58 ++++++++++++-
.../minicluster/MiniClusterJobDispatcher.java | 2 +-
.../OneTimeLeaderListenerFuture.java | 60 ++++++++++++++
.../resourcemanager/ResourceManager.java | 11 +++
.../resourcemanager/ResourceManagerGateway.java | 8 ++
.../runtime/minicluster/MiniClusterITCase.java | 8 ++
.../Flip6LocalStreamEnvironment.java | 23 +++---
.../LocalStreamEnvironmentITCase.java | 81 +++++++++++++++++++
.../flink/core/testutils/CheckedThread.java | 85 ++++++++++++++++++++
10 files changed, 392 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
new file mode 100644
index 0000000..23cd34b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A combination of a leader address and leader id.
+ */
+public class LeaderAddressAndId {
+
+ private final String leaderAddress;
+ private final UUID leaderId;
+
+ public LeaderAddressAndId(String leaderAddress, UUID leaderId) {
+ this.leaderAddress = checkNotNull(leaderAddress);
+ this.leaderId = checkNotNull(leaderId);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public String leaderAddress() {
+ return leaderAddress;
+ }
+
+ public UUID leaderId() {
+ return leaderId;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return 31 * leaderAddress.hashCode()+ leaderId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o != null && o.getClass() == LeaderAddressAndId.class) {
+ final LeaderAddressAndId that = (LeaderAddressAndId) o;
+ return this.leaderAddress.equals(that.leaderAddress) && this.leaderId.equals(that.leaderId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "LeaderAddressAndId (" + leaderAddress + " / " + leaderId + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3ede5b5..1b9f265 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -27,11 +27,15 @@ import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -169,6 +173,7 @@ public class MiniCluster {
final boolean singleRpc = config.getUseSingleRpcSystem();
try {
+ LOG.info("Starting Metrics Registry");
metricRegistry = createMetricRegistry(configuration);
RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
@@ -176,10 +181,12 @@ public class MiniCluster {
RpcService[] resourceManagerRpcServices = new RpcService[numResourceManagers];
// bring up all the RPC services
- if (singleRpc) {
- // one common RPC for all
- commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+ LOG.info("Starting RPC Service(s)");
+
+ // we always need the 'commonRpcService' for auxiliary calls
+ commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+ if (singleRpc) {
// set that same RPC service for all JobManagers and TaskManagers
for (int i = 0; i < numJobManagers; i++) {
jobManagerRpcServices[i] = commonRpcService;
@@ -236,7 +243,7 @@ public class MiniCluster {
configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
// bring up the dispatcher that launches JobManagers when jobs submitted
- LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers);
+ LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers);
jobDispatcher = new MiniClusterJobDispatcher(
configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
}
@@ -357,6 +364,49 @@ public class MiniCluster {
}
}
+ public void waitUntilTaskManagerRegistrationsComplete() throws Exception {
+ LeaderRetrievalService rmMasterListener = null;
+ Future<LeaderAddressAndId> addressAndIdFuture;
+
+ try {
+ synchronized (lock) {
+ checkState(running, "FlinkMiniCluster is not running");
+
+ OneTimeLeaderListenerFuture listenerFuture = new OneTimeLeaderListenerFuture();
+ rmMasterListener = haServices.getResourceManagerLeaderRetriever();
+ rmMasterListener.start(listenerFuture);
+ addressAndIdFuture = listenerFuture.future();
+ }
+
+ final LeaderAddressAndId addressAndId = addressAndIdFuture.get();
+
+ final ResourceManagerGateway resourceManager =
+ commonRpcService.connect(addressAndId.leaderAddress(), ResourceManagerGateway.class).get();
+
+ final int numTaskManagersToWaitFor = taskManagerRunners.length;
+
+ // poll and wait until enough TaskManagers are available
+ while (true) {
+ int numTaskManagersAvailable =
+ resourceManager.getNumberOfRegisteredTaskManagers(addressAndId.leaderId()).get();
+
+ if (numTaskManagersAvailable >= numTaskManagersToWaitFor) {
+ break;
+ }
+ Thread.sleep(2);
+ }
+ }
+ finally {
+ try {
+ if (rmMasterListener != null) {
+ rmMasterListener.stop();
+ }
+ } catch (Exception e) {
+ LOG.warn("Error shutting down leader listener for ResourceManager");
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// running jobs
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 8ac8eba..7fffaee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -143,7 +143,7 @@ public class MiniClusterJobDispatcher {
if (!shutdown) {
shutdown = true;
- LOG.info("Shutting down the dispatcher");
+ LOG.info("Shutting down the job dispatcher");
// in this shutdown code we copy the references to the stack first,
// to avoid concurrent modification
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
new file mode 100644
index 0000000..b0157d8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import java.util.UUID;
+
+/**
+ * A leader listener that exposes a future for the first leader notification.
+ *
+ * <p>The future can be obtained via the {@link #future()} method.
+ */
+public class OneTimeLeaderListenerFuture implements LeaderRetrievalListener {
+
+ private final FlinkCompletableFuture<LeaderAddressAndId> future;
+
+ public OneTimeLeaderListenerFuture() {
+ this.future = new FlinkCompletableFuture<>();
+ }
+
+ /**
+ * Gets the future that is completed with the leader address and ID.
+ * @return The future.
+ */
+ public FlinkFuture<LeaderAddressAndId> future() {
+ return future;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+ future.complete(new LeaderAddressAndId(leaderAddress, leaderSessionID));
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ future.completeExceptionally(exception);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 145cc40..76b4a86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -502,6 +503,16 @@ public abstract class ResourceManager<WorkerType extends Serializable>
shutDownApplication(finalStatus, optionalDiagnostics);
}
+ @RpcMethod
+ public Integer getNumberOfRegisteredTaskManagers(UUID leaderSessionId) throws LeaderIdMismatchException {
+ if (this.leaderSessionId != null && this.leaderSessionId.equals(leaderSessionId)) {
+ return taskExecutors.size();
+ }
+ else {
+ throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionId);
+ }
+ }
+
// ------------------------------------------------------------------------
// Testing methods
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 0a37bb9..8235ea7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -122,4 +122,12 @@ public interface ResourceManagerGateway extends RpcGateway {
* @param optionalDiagnostics
*/
void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
+
+ /**
+ * Gets the currently registered number of TaskManagers.
+ *
+ * @param leaderSessionId The leader session ID with which to address the ResourceManager.
+ * @return The future to the number of registered TaskManagers.
+ */
+ Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index 2cf2d4d..d9a1896 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -33,6 +33,10 @@ import org.junit.Test;
*/
public class MiniClusterITCase extends TestLogger {
+ // ------------------------------------------------------------------------
+ // Simple Job Running Tests
+ // ------------------------------------------------------------------------
+
@Test
public void runJobWithSingleRpcService() throws Exception {
MiniClusterConfiguration cfg = new MiniClusterConfiguration();
@@ -63,6 +67,10 @@ public class MiniClusterITCase extends TestLogger {
executeJob(miniCluster);
}
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
private static void executeJob(MiniCluster miniCluster) throws Exception {
miniCluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index a0c128e..2007d35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -30,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,8 +67,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +
"or running in a TestEnvironment context.");
}
-
+
this.conf = config == null ? new Configuration() : config;
+ setParallelism(1);
}
/**
@@ -85,17 +86,12 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
- JobGraph jobGraph = streamGraph.getJobGraph();
+ // TODO - temp fix to enforce restarts due to a bug in the allocation protocol
+ streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 5));
+ JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
- // As jira FLINK-5140 described,
- // we have to set restart strategy to handle NoResourceAvailableException.
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.setRestartStrategy(
- RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
- jobGraph.setExecutionConfig(executionConfig);
-
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
@@ -105,7 +101,8 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
- // Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up.
+ // Currently we do not reuse slot anymore,
+ // so we need to sum up the parallelism of all vertices
int slotsCount = 0;
for (JobVertex jobVertex : jobGraph.getVertices()) {
slotsCount += jobVertex.getParallelism();
@@ -119,8 +116,10 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
+ miniCluster.waitUntilTaskManagerRegistrationsComplete();
return miniCluster.runJobBlocking(jobGraph);
- } finally {
+ }
+ finally {
transformations.clear();
miniCluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
new file mode 100644
index 0000000..a360d0e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class LocalStreamEnvironmentITCase {
+
+ /**
+ * Test test verifies that the execution environment can be used to execute a
+ * single job with multiple slots.
+ */
+ @Test
+ public void testRunIsolatedJob() throws Exception {
+ Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment();
+ assertEquals(1, env.getParallelism());
+
+ addSmallBoundedJob(env, 3);
+ env.execute();
+ }
+
+ /**
+ * Test test verifies that the execution environment can be used to execute multiple
+ * bounded streaming jobs after one another.
+ */
+ @Test
+ public void testMultipleJobsAfterAnother() throws Exception {
+ Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment();
+
+ addSmallBoundedJob(env, 3);
+ env.execute();
+
+ addSmallBoundedJob(env, 5);
+ env.execute();
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
+ DataStream<Long> stream = env
+ .generateSequence(1, 100)
+ .setParallelism(parallelism)
+ .slotSharingGroup("group_1");
+
+ stream
+ .filter(new FilterFunction<Long>() {
+ @Override
+ public boolean filter(Long value) {
+ return false;
+ }
+ })
+ .setParallelism(parallelism)
+ .startNewChain()
+ .slotSharingGroup("group_2")
+
+ .print()
+ .setParallelism(parallelism);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
new file mode 100644
index 0000000..aedbb5c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+/**
+ * A thread that additionally catches exceptions and offers a joining method that
+ * re-throws the exceptions.
+ *
+ * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one
+ * needs to extends this class and implement the {@link #go()} method. That method may
+ * throw exceptions.
+ *
+ * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this
+ * thread via the {@link #sync()} method.
+ */
+public abstract class CheckedThread extends Thread {
+
+ /** The error thrown from the main work method */
+ private volatile Throwable error;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method needs to be overwritten to contain the main work logic.
+ * It takes the role of {@link Thread#run()}, but should propagate exceptions.
+ *
+ * @throws Exception The exceptions thrown here will be re-thrown in the {@link #sync()} method.
+ */
+ public abstract void go() throws Exception;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method is final - thread work should go into the {@link #go()} method instead.
+ */
+ @Override
+ public final void run() {
+ try {
+ go();
+ }
+ catch (Throwable t) {
+ error = t;
+ }
+ }
+
+ /**
+ * Waits until the thread is completed and checks whether any error occurred during
+ * the execution.
+ *
+ * <p>This method blocks like {@link #join()}, but performs an additional check for
+ * exceptions thrown from the {@link #go()} method.
+ */
+ public void sync() throws Exception {
+ super.join();
+
+ // propagate the error
+ if (error != null) {
+ if (error instanceof Error) {
+ throw (Error) error;
+ }
+ else if (error instanceof Exception) {
+ throw (Exception) error;
+ }
+ else {
+ throw new Exception(error.getMessage(), error);
+ }
+ }
+ }
+}
[09/10] flink git commit: [FLINK-4928] [yarn] Implement FLIP-6 YARN
Application Master Runner
Posted by se...@apache.org.
[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8293bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8293bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8293bcb
Branch: refs/heads/flip-6
Commit: e8293bcba588296656ae8425506bd2edf94a70e4
Parents: 8e57fba
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Thu Nov 3 16:24:47 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:44 2016 +0100
----------------------------------------------------------------------
...bstractYarnFlinkApplicationMasterRunner.java | 213 +++++++++++++
.../yarn/YarnFlinkApplicationMasterRunner.java | 316 +++++++++++++++++++
2 files changed, 529 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e8293bcb/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
new file mode 100644
index 0000000..923694e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * This class is the executable entry point for the YARN application master.
+ * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public abstract class AbstractYarnFlinkApplicationMasterRunner {
+
+ /** Logger */
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractYarnFlinkApplicationMasterRunner.class);
+
+ /** The process environment variables */
+ protected static final Map<String, String> ENV = System.getenv();
+
+ /** The exit code returned if the initialization of the application master failed */
+ protected static final int INIT_ERROR_EXIT_CODE = 31;
+
+ /** The host name passed by env */
+ protected String appMasterHostname;
+
+ /**
+ * The instance entry point for the YARN application master. Obtains user group
+ * information and calls the main work method {@link #runApplicationMaster(org.apache.flink.configuration.Configuration)} as a
+ * privileged action.
+ *
+ * @param args The command line arguments.
+ * @return The process exit code.
+ */
+ protected int run(String[] args) {
+ try {
+ LOG.debug("All environment variables: {}", ENV);
+
+ final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+ Preconditions.checkArgument(yarnClientUsername != null, "YARN client user name environment variable {} not set",
+ YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+ final String currDir = ENV.get(Environment.PWD.key());
+ Preconditions.checkArgument(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+ LOG.debug("Current working directory: {}", currDir);
+
+ final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
+ LOG.debug("Remote keytab path obtained {}", remoteKeytabPath);
+
+ final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ LOG.info("Remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+ String keytabPath = null;
+ if(remoteKeytabPath != null) {
+ File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+ keytabPath = f.getAbsolutePath();
+ LOG.debug("Keytab path: {}", keytabPath);
+ }
+
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+ currentUser.getShortUserName(), yarnClientUsername );
+
+ SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+ //To support Yarn Secure Integration Test Scenario
+ File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+ if(krb5Conf.exists() && krb5Conf.canRead()) {
+ String krb5Path = krb5Conf.getAbsolutePath();
+ LOG.info("KRB5 Conf: {}", krb5Path);
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ sc.setHadoopConfiguration(conf);
+ }
+
+ // Flink configuration
+ final Map<String, String> dynamicProperties =
+ FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+ LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+
+ final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
+ if(keytabPath != null && remoteKeytabPrincipal != null) {
+ flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+ flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ }
+
+ SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+
+ // Note that we use the "appMasterHostname" given by YARN here, to make sure
+ // we use the hostnames given by YARN consistently throughout akka.
+ // for akka "localhost" and "localhost.localdomain" are different actors.
+ this.appMasterHostname = ENV.get(Environment.NM_HOST.key());
+ Preconditions.checkArgument(appMasterHostname != null,
+ "ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
+ LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
+
+ return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+ @Override
+ public Integer run() {
+ return runApplicationMaster(flinkConfig);
+ }
+ });
+
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization failed", t);
+ return INIT_ERROR_EXIT_CODE;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Core work method
+ // ------------------------------------------------------------------------
+
+ /**
+ * The main work method, must run as a privileged action.
+ *
+ * @return The return code for the Java process.
+ */
+ protected abstract int runApplicationMaster(Configuration config);
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+ /**
+ * @param baseDirectory The working directory
+ * @param additional Additional parameters
+ *
+ * @return The configuration to be used by the TaskExecutors.
+ */
+ private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) {
+ LOG.info("Loading config from directory {}.", baseDirectory);
+
+ Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
+
+ configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+
+ // add dynamic properties to JobManager configuration.
+ for (Map.Entry<String, String> property : additional.entrySet()) {
+ configuration.setString(property.getKey(), property.getValue());
+ }
+
+ // override zookeeper namespace with user cli argument (if provided)
+ String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
+ if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
+ configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
+ }
+
+ // if a web monitor shall be started, set the port to random binding
+ if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+ configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+ }
+
+ // if the user has set the deprecated YARN-specific config keys, we add the
+ // corresponding generic config keys instead. that way, later code needs not
+ // deal with deprecated config keys
+
+ BootstrapTools.substituteDeprecatedConfigKey(configuration,
+ ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+
+ BootstrapTools.substituteDeprecatedConfigKey(configuration,
+ ConfigConstants.YARN_HEAP_CUTOFF_MIN,
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
+
+ BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+ ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
+ ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+
+ BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+ ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
+ ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+
+ return configuration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e8293bcb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
new file mode 100644
index 0000000..e58c77e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+
+/**
+ * This class is the executable entry point for the YARN application master.
+ * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasnagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster}
+ * JobMaster handles Flink job execution, while the YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicationMasterRunner
+ implements OnCompletionActions, FatalErrorHandler {
+
+ /** Logger */
+ protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+ /** The job graph file path */
+ private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+ /** The lock to guard startup / shutdown / manipulation methods */
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private MetricRegistry metricRegistry;
+
+ @GuardedBy("lock")
+ private HighAvailabilityServices haServices;
+
+ @GuardedBy("lock")
+ private RpcService commonRpcService;
+
+ @GuardedBy("lock")
+ private ResourceManager resourceManager;
+
+ @GuardedBy("lock")
+ private JobManagerRunner jobManagerRunner;
+
+ @GuardedBy("lock")
+ private JobGraph jobGraph;
+
+ // ------------------------------------------------------------------------
+ // Program entry point
+ // ------------------------------------------------------------------------
+
+ /**
+ * The entry point for the YARN application master.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster runner", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ // run and exit with the proper return code
+ int returnCode = new YarnFlinkApplicationMasterRunner().run(args);
+ System.exit(returnCode);
+ }
+
+ @Override
+ protected int runApplicationMaster(Configuration config) {
+
+ try {
+ // ---- (1) create common services
+
+ // try to start the rpc service
+ // using the port range definition from the config.
+ final String amPortRange = config.getString(
+ ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+ ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+
+ synchronized (lock) {
+ haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+ metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
+
+ // ---- (2) init resource manager -------
+ resourceManager = createResourceManager(config);
+
+ // ---- (3) init job master parameters
+ jobManagerRunner = createJobManagerRunner(config);
+
+ // ---- (4) start the resource manager and job manager runner:
+ resourceManager.start();
+ LOG.debug("YARN Flink Resource Manager started");
+
+ jobManagerRunner.start();
+ LOG.debug("Job Manager Runner started");
+
+ // ---- (5) start the web monitor
+ // TODO: add web monitor
+ }
+
+ // wait for resource manager to finish
+ resourceManager.getTerminationFuture().get();
+ // everything started, we can wait until all is done or the process is killed
+ LOG.info("YARN Application Master finished");
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization failed", t);
+ shutdown(ApplicationStatus.FAILED, t.getMessage());
+ return INIT_ERROR_EXIT_CODE;
+ }
+
+ return 0;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ protected RpcService createRpcService(
+ Configuration configuration,
+ String bindAddress,
+ String portRange) throws Exception{
+ ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
+ FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+ return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
+ }
+
+ private ResourceManager createResourceManager(Configuration config) throws ConfigurationException {
+ final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
+ final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+ final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices);
+
+ return new YarnResourceManager(config,
+ ENV,
+ commonRpcService,
+ resourceManagerConfiguration,
+ haServices,
+ slotManagerFactory,
+ metricRegistry,
+ jobLeaderIdService,
+ this);
+ }
+
+ private JobManagerRunner createJobManagerRunner(Configuration config) throws Exception{
+ // first get JobGraph from local resources
+ //TODO: generate the job graph from user's jar
+ jobGraph = loadJobGraph(config);
+
+ // we first need to mark the job as running in the HA services, so that the
+ // JobManager leader will recognize that it as work to do
+ try {
+ haServices.getRunningJobsRegistry().setJobRunning(jobGraph.getJobID());
+ }
+ catch (Throwable t) {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "Could not register the job at the high-availability services", t);
+ }
+
+ // now the JobManagerRunner
+ return new JobManagerRunner(
+ jobGraph, config,
+ commonRpcService,
+ haServices,
+ this,
+ this);
+ }
+
+ protected void shutdown(ApplicationStatus status, String msg) {
+ synchronized (lock) {
+ if (jobManagerRunner != null) {
+ try {
+ jobManagerRunner.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the JobManagerRunner", tt);
+ }
+ }
+ if (resourceManager != null) {
+ try {
+ resourceManager.shutDownCluster(status, msg);
+ resourceManager.shutDown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the ResourceManager", tt);
+ }
+ }
+ if (commonRpcService != null) {
+ try {
+ commonRpcService.stopService();
+ } catch (Throwable tt) {
+ LOG.error("Error shutting down resource manager rpc service", tt);
+ }
+ }
+ if (haServices != null) {
+ try {
+ haServices.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the HA service", tt);
+ }
+ }
+ if (metricRegistry != null) {
+ try {
+ metricRegistry.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the metrics registry", tt);
+ }
+ }
+ }
+ }
+
+ private static JobGraph loadJobGraph(Configuration config) throws Exception {
+ JobGraph jg = null;
+ String jobGraphFile = config.getString(JOB_GRAPH_FILE_PATH, "job.graph");
+ if (jobGraphFile != null) {
+ File fp = new File(jobGraphFile);
+ if (fp.isFile()) {
+ FileInputStream input = new FileInputStream(fp);
+ ObjectInputStream obInput = new ObjectInputStream(input);
+ jg = (JobGraph) obInput.readObject();
+ input.close();
+ }
+ }
+ if (jg == null) {
+ throw new Exception("Fail to load job graph " + jobGraphFile);
+ }
+ return jg;
+ }
+
+ //-------------------------------------------------------------------------------------
+ // Fatal error handler
+ //-------------------------------------------------------------------------------------
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ LOG.error("Encountered fatal error.", exception);
+
+ shutdown(ApplicationStatus.FAILED, exception.getMessage());
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // Result and error handling methods
+ //----------------------------------------------------------------------------------------------
+
+ /**
+ * Job completion notification triggered by JobManager
+ */
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+ shutdown(ApplicationStatus.SUCCEEDED, null);
+ }
+
+ /**
+ * Job completion notification triggered by JobManager
+ */
+ @Override
+ public void jobFailed(Throwable cause) {
+ shutdown(ApplicationStatus.FAILED, cause.getMessage());
+ }
+
+ /**
+ * Job completion notification triggered by self
+ */
+ @Override
+ public void jobFinishedByOther() {
+ shutdown(ApplicationStatus.UNKNOWN, null);
+ }
+}
[10/10] flink git commit: [FLINK-4929] [yarn] Implement FLIP-6 YARN
TaskExecutor Runner
Posted by se...@apache.org.
[FLINK-4929] [yarn] Implement FLIP-6 YARN TaskExecutor Runner
Summary: Implement FLIP-6 YARN TaskExecutor Runner
Test Plan: NA
Reviewers: biao.liub
Differential Revision: http://phabricator.taobao.net/D6564
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55e94c3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55e94c3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55e94c3c
Branch: refs/heads/flip-6
Commit: 55e94c3c655dc73beaebfd13b83531194f0ae539
Parents: e8293bc
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Wed Nov 23 18:00:07 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:44 2016 +0100
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskManagerRunner.java | 6 +
.../flink/yarn/YarnTaskExecutorRunner.java | 257 +++++++++++++++++++
2 files changed, 263 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/55e94c3c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 1145a46..3500f6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -151,6 +152,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
}
}
+ // export the termination future for caller to know it is terminated
+ public Future<Void> getTerminationFuture() {
+ return taskManager.getTerminationFuture();
+ }
+
// --------------------------------------------------------------------------------------------
// FatalErrorHandler methods
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/55e94c3c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
new file mode 100644
index 0000000..d9912eb
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * This class is the executable entry point for running a TaskExecutor in a YARN container.
+ */
+public class YarnTaskExecutorRunner {
+
+ /** Logger */
+ protected static final Logger LOG = LoggerFactory.getLogger(YarnTaskExecutorRunner.class);
+
+ /** The process environment variables */
+ private static final Map<String, String> ENV = System.getenv();
+
+ /** The exit code returned if the initialization of the yarn task executor runner failed */
+ private static final int INIT_ERROR_EXIT_CODE = 31;
+
+ private MetricRegistry metricRegistry;
+
+ private HighAvailabilityServices haServices;
+
+ private RpcService taskExecutorRpcService;
+
+ private TaskManagerRunner taskManagerRunner;
+
+ // ------------------------------------------------------------------------
+ // Program entry point
+ // ------------------------------------------------------------------------
+
+ /**
+ * The entry point for the YARN task executor runner.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ // run and exit with the proper return code
+ int returnCode = new YarnTaskExecutorRunner().run(args);
+ System.exit(returnCode);
+ }
+
+ /**
+ * The instance entry point for the YARN task executor. Obtains user group
+ * information and calls the main work method {@link #runTaskExecutor(org.apache.flink.configuration.Configuration)} as a
+ * privileged action.
+ *
+ * @param args The command line arguments.
+ * @return The process exit code.
+ */
+ protected int run(String[] args) {
+ try {
+ LOG.debug("All environment variables: {}", ENV);
+
+ final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+ final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
+ LOG.info("Current working/local Directory: {}", localDirs);
+
+ final String currDir = ENV.get(Environment.PWD.key());
+ LOG.info("Current working Directory: {}", currDir);
+
+ final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
+ LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
+
+ final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+ final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
+ FileSystem.setDefaultScheme(configuration);
+
+ // configure local directory
+ String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+ if (flinkTempDirs == null) {
+ LOG.info("Setting directories for temporary file " + localDirs);
+ configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs);
+ }
+ else {
+ LOG.info("Overriding YARN's temporary file directories with those " +
+ "specified in the Flink config: " + flinkTempDirs);
+ }
+
+ // tell akka to die in case of an error
+ configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
+ String keytabPath = null;
+ if(remoteKeytabPath != null) {
+ File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+ keytabPath = f.getAbsolutePath();
+ LOG.info("keytab path: {}", keytabPath);
+ }
+
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+ currentUser.getShortUserName(), yarnClientUsername);
+
+ SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+ //To support Yarn Secure Integration Test Scenario
+ File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+ if(krb5Conf.exists() && krb5Conf.canRead()) {
+ String krb5Path = krb5Conf.getAbsolutePath();
+ LOG.info("KRB5 Conf: {}", krb5Path);
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ sc.setHadoopConfiguration(conf);
+ }
+
+ if(keytabPath != null && remoteKeytabPrincipal != null) {
+ configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+ configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ }
+
+ SecurityContext.install(sc.setFlinkConfiguration(configuration));
+
+ return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+ @Override
+ public Integer run() {
+ return runTaskExecutor(configuration);
+ }
+ });
+
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization failed", t);
+ return INIT_ERROR_EXIT_CODE;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Core work method
+ // ------------------------------------------------------------------------
+
+ /**
+ * The main work method, must run as a privileged action.
+ *
+ * @return The return code for the Java process.
+ */
+ protected int runTaskExecutor(Configuration config) {
+
+ try {
+ // ---- (1) create common services
+ // first get the ResouceId, resource id is the container id for yarn.
+ final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
+ Preconditions.checkArgument(containerId != null,
+ "ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
+ // use the hostname passed by job manager
+ final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);
+ if (taskExecutorHostname != null) {
+ config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname);
+ }
+
+ ResourceID resourceID = new ResourceID(containerId);
+ LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString());
+
+ haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+ metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+
+ // ---- (2) init task manager runner -------
+ taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices);
+ taskManagerRunner = new TaskManagerRunner(config, resourceID, taskExecutorRpcService, haServices, metricRegistry);
+
+ // ---- (3) start the task manager runner
+ taskManagerRunner.start();
+ LOG.debug("YARN task executor started");
+
+ taskManagerRunner.getTerminationFuture().get();
+ // everything started, we can wait until all is done or the process is killed
+ LOG.info("YARN task manager runner finished");
+ shutdown();
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN task executor initialization failed", t);
+ shutdown();
+ return INIT_ERROR_EXIT_CODE;
+ }
+
+ return 0;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+
+ protected void shutdown() {
+ if (taskExecutorRpcService != null) {
+ try {
+ taskExecutorRpcService.stopService();
+ } catch (Throwable tt) {
+ LOG.error("Error shutting down job master rpc service", tt);
+ }
+ }
+ if (haServices != null) {
+ try {
+ haServices.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the HA service", tt);
+ }
+ }
+ if (metricRegistry != null) {
+ try {
+ metricRegistry.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Failed to stop the metrics registry", tt);
+ }
+ }
+ }
+
+}
[04/10] flink git commit: [FLINK-5141] [streaming api] Implement
LocalStreamEnvironment for new mini cluster.
Posted by se...@apache.org.
[FLINK-5141] [streaming api] Implement LocalStreamEnvironment for new mini cluster.
This closes #2877
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0086b57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0086b57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0086b57
Branch: refs/heads/flip-6
Commit: c0086b57eec63bab627383205eed2ff8636c5394
Parents: 4afcc4a
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Wed Nov 23 17:02:11 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../Flip6LocalStreamEnvironment.java | 128 +++++++++++++++++++
1 file changed, 128 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0086b57/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
new file mode 100644
index 0000000..a0c128e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Flip6LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
+ * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
+ * Flink cluster in the background and executes the program on that cluster.
+ *
+ * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
+ * parallelism can be set via {@link #setParallelism(int)}.
+ */
+@Public
+public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
+
+ /** The configuration to use for the mini cluster */
+ private final Configuration conf;
+
+ /**
+ * Creates a new mini cluster stream environment that uses the default configuration.
+ */
+ public Flip6LocalStreamEnvironment() {
+ this(null);
+ }
+
+ /**
+ * Creates a new mini cluster stream environment that configures its local executor with the given configuration.
+ *
+ * @param config The configuration used to configure the local executor.
+ */
+ public Flip6LocalStreamEnvironment(Configuration config) {
+ if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+ throw new InvalidProgramException(
+ "The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +
+ "or running in a TestEnvironment context.");
+ }
+
+ this.conf = config == null ? new Configuration() : config;
+ }
+
+ /**
+ * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
+ * specified name.
+ *
+ * @param jobName
+ * name of the job
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ */
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ // transform the streaming program into a JobGraph
+ StreamGraph streamGraph = getStreamGraph();
+ streamGraph.setJobName(jobName);
+
+ JobGraph jobGraph = streamGraph.getJobGraph();
+
+ jobGraph.setAllowQueuedScheduling(true);
+
+ // As jira FLINK-5140 described,
+ // we have to set restart strategy to handle NoResourceAvailableException.
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setRestartStrategy(
+ RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+ jobGraph.setExecutionConfig(executionConfig);
+
+ Configuration configuration = new Configuration();
+ configuration.addAll(jobGraph.getJobConfiguration());
+ configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+
+ // add (and override) the settings with what the user defined
+ configuration.addAll(this.conf);
+
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
+
+ // Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up.
+ int slotsCount = 0;
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ slotsCount += jobVertex.getParallelism();
+ }
+ cfg.setNumTaskManagerSlots(slotsCount);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Running job on local embedded Flink mini cluster");
+ }
+
+ MiniCluster miniCluster = new MiniCluster(cfg);
+ try {
+ miniCluster.start();
+ return miniCluster.runJobBlocking(jobGraph);
+ } finally {
+ transformations.clear();
+ miniCluster.shutdown();
+ }
+ }
+}