You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:50 UTC

[01/10] flink git commit: [FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner

Repository: flink
Updated Branches:
  refs/heads/flip-6 4afcc4abd -> 55e94c3c6


[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e57fba0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e57fba0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e57fba0

Branch: refs/heads/flip-6
Commit: 8e57fba073be139f69e072bdb4888d582fa7211a
Parents: e11ea3f
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Thu Nov 3 16:24:47 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../apache/flink/yarn/YarnResourceManager.java  | 552 +++++++++++++++++++
 2 files changed, 553 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e57fba0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 76b4a86..3bcbfda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -637,7 +637,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 *
 	 * @param t The exception describing the fatal error
 	 */
-	void onFatalErrorAsync(final Throwable t) {
+	protected void onFatalErrorAsync(final Throwable t) {
 		runAsync(new Runnable() {
 			@Override
 			public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e57fba0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
new file mode 100644
index 0000000..6280bdf
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
+/**
+ * The yarn implementation of the resource manager. Used when the system is started
+ * via the resource framework YARN.
+ */
+public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
+	protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+	/** The process environment variables */
+	private final Map<String, String> ENV;
+
+	/** The heartbeat interval while the resource master is waiting for containers */
+	private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+	/** The default heartbeat interval during regular operation */
+	private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+	/** The maximum time that TaskExecutors may be waiting to register at the ResourceManager before they quit */
+	private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
+
+	/** Environment variable name of the final container id used by the YarnResourceManager.
+	 * Container ID generation may vary across Hadoop versions. */
+	final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+	
+	/** Environment variable name of the hostname used by the Yarn.
+	 * TaskExecutor use this host name to start port. */
+	final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+
+	/** Default heartbeat interval between this resource manager and the YARN ResourceManager */
+	private final int yarnHeartbeatIntervalMillis;
+
+	private final Configuration flinkConfig;
+
+	private final YarnConfiguration yarnConfig;
+
+	/** Client to communicate with the Resource Manager (YARN's master) */
+	private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
+
+	/** Client to communicate with the Node manager and launch TaskExecutor processes */
+	private NMClient nodeManagerClient;
+
+	/** The number of containers requested, but not yet granted */
+	private int numPendingContainerRequests;
+
+	public YarnResourceManager(
+			Configuration flinkConfig,
+			Map<String, String> env,
+			RpcService rpcService,
+			ResourceManagerConfiguration resourceManagerConfiguration,
+			HighAvailabilityServices highAvailabilityServices,
+			SlotManagerFactory slotManagerFactory,
+			MetricRegistry metricRegistry,
+			JobLeaderIdService jobLeaderIdService,
+			FatalErrorHandler fatalErrorHandler) {
+		super(
+			rpcService,
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			slotManagerFactory,
+			metricRegistry,
+			jobLeaderIdService,
+			fatalErrorHandler);
+		this.flinkConfig  = flinkConfig;
+		this.yarnConfig = new YarnConfiguration();
+		this.ENV = env;
+		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
+				ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+
+		final long yarnExpiryIntervalMS = yarnConfig.getLong(
+				YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+				YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+
+		if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
+			log.warn("The heartbeat interval of the Flink Application master ({}) is greater " +
+					"than YARN's expiry interval ({}). The application is likely to be killed by YARN.",
+					yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
+		}
+		yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
+		numPendingContainerRequests = 0;
+	}
+
+	@Override
+	protected void initialize() throws ResourceManagerException {
+		resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
+		resourceManagerClient.init(yarnConfig);
+		resourceManagerClient.start();
+		try {
+			//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
+			Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
+			//TODO: the third paramter should be the webmonitor address
+			resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
+		} catch (Exception e) {
+			LOG.info("registerApplicationMaster fail", e);
+		}
+
+		// create the client to communicate with the node managers
+		nodeManagerClient = NMClient.createNMClient();
+		nodeManagerClient.init(yarnConfig);
+		nodeManagerClient.start();
+		nodeManagerClient.cleanupRunningContainersOnStop(true);
+	}
+
+	@Override
+	public void shutDown() throws Exception {
+		// shut down all components
+		if (resourceManagerClient != null) {
+			try {
+				resourceManagerClient.stop();
+			} catch (Throwable t) {
+				LOG.error("Could not cleanly shut down the Asynchronous Resource Manager Client", t);
+			}
+		}
+		if (nodeManagerClient != null) {
+			try {
+				nodeManagerClient.stop();
+			} catch (Throwable t) {
+				LOG.error("Could not cleanly shut down the Node Manager Client", t);
+			}
+		}
+		super.shutDown();
+	}
+
+	@Override
+	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+
+		// first, de-register from YARN
+		FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
+		LOG.info("Unregistering application from the YARN Resource Manager");
+		try {
+			resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
+		} catch (Throwable t) {
+			LOG.error("Could not unregister the application master.", t);
+		}
+	}
+
+	@Override
+	public void startNewWorker(ResourceProfile resourceProfile) {
+		// Priority for worker containers - priorities are intra-application
+		//TODO: set priority according to the resource allocated
+		Priority priority = Priority.newInstance(0);
+		int mem = resourceProfile.getMemoryInMB() <= Integer.MAX_VALUE ? (int)resourceProfile.getMemoryInMB() : Integer.MAX_VALUE;
+		if (mem < 0) {
+			mem = 1024;
+		}
+		int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores() + 1;
+		Resource capability = Resource.newInstance(mem , vcore);
+		requestYarnContainer(capability, priority);
+	}
+
+	@Override
+	protected ResourceID workerStarted(ResourceID resourceID) {
+		return resourceID;
+	}
+
+	// AMRMClientAsync CallbackHandler methods
+	@Override
+	public float getProgress() {
+		// Temporarily need not record the total size of asked and allocated containers
+		return 1;
+	}
+
+	@Override
+	public void onContainersCompleted(List<ContainerStatus> list) {
+		for (ContainerStatus container : list) {
+			if (container.getExitStatus() < 0) {
+				notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics());
+				// TODO: notice job master slot fail
+			}
+		}
+	}
+
+	@Override
+	public void onContainersAllocated(List<Container> containers) {
+		for (Container container : containers) {
+			numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
+			LOG.info("Received new container: {} - Remaining pending container requests: {}",
+					container.getId(), numPendingContainerRequests);
+			try {
+				/** Context information used to start a TaskExecutor Java process */
+				ContainerLaunchContext taskExecutorLaunchContext =
+						createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost());
+				nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+			}
+			catch (Throwable t) {
+				// failed to launch the container, will release the failed one and ask for a new one
+				LOG.error("Could not start TaskManager in container " + container, t);
+				resourceManagerClient.releaseAssignedContainer(container.getId());
+				requestYarnContainer(container.getResource(), container.getPriority());
+			}
+		}
+		if (numPendingContainerRequests <= 0) {
+			resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
+		}
+	}
+
+	@Override
+	public void onShutdownRequest() {
+		// Nothing to do
+	}
+
+	@Override
+	public void onNodesUpdated(List<NodeReport> list) {
+		// We are not interested in node updates
+	}
+
+	@Override
+	public void onError(Throwable error) {
+		onFatalErrorAsync(error);
+	}
+
+	//Utility methods
+	/**
+	 * Converts a Flink application status enum to a YARN application status enum.
+	 * @param status The Flink application status.
+	 * @return The corresponding YARN application status.
+	 */
+	private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
+		if (status == null) {
+			return FinalApplicationStatus.UNDEFINED;
+		}
+		else {
+			switch (status) {
+				case SUCCEEDED:
+					return FinalApplicationStatus.SUCCEEDED;
+				case FAILED:
+					return FinalApplicationStatus.FAILED;
+				case CANCELED:
+					return FinalApplicationStatus.KILLED;
+				default:
+					return FinalApplicationStatus.UNDEFINED;
+			}
+		}
+	}
+
+	// parse the host and port from akka address, 
+	// the akka address is like akka.tcp://flink@100.81.153.180:49712/user/$a
+	private static Tuple2<String, Integer> parseHostPort(String address) {
+		String[] hostPort = address.split("@")[1].split(":");
+		String host = hostPort[0];
+		String port = hostPort[1].split("/")[0];
+		return new Tuple2(host, Integer.valueOf(port));
+	}
+
+	private void requestYarnContainer(Resource resource, Priority priority) {
+		resourceManagerClient.addContainerRequest(
+				new AMRMClient.ContainerRequest(resource, null, null, priority));
+		// make sure we transmit the request fast and receive fast news of granted allocations
+		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+
+		numPendingContainerRequests++;
+		LOG.info("Requesting new TaskManager container pending requests: {}",
+				numPendingContainerRequests);
+	}
+
+	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
+			throws Exception {
+		// init the ContainerLaunchContext
+		final String currDir = ENV.get(ApplicationConstants.Environment.PWD.key());
+
+		final ContaineredTaskManagerParameters taskManagerParameters =
+				ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 1);
+
+		LOG.info("TaskExecutor{} will be started with container size {} MB, JVM heap size {} MB, " +
+				"JVM direct memory limit {} MB",
+				containerId,
+				taskManagerParameters.taskManagerTotalMemoryMB(),
+				taskManagerParameters.taskManagerHeapSizeMB(),
+				taskManagerParameters.taskManagerDirectMemoryLimitMB());
+		final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
+				flinkConfig, "", 0, 1, TASKEXECUTOR_REGISTRATION_TIMEOUT);
+		LOG.debug("TaskManager configuration: {}", taskManagerConfig);
+
+		ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext(
+				flinkConfig, yarnConfig, ENV,
+				taskManagerParameters, taskManagerConfig,
+				currDir, YarnTaskExecutorRunner.class, LOG);
+
+		// set a special environment variable to uniquely identify this container
+		taskExecutorLaunchContext.getEnvironment()
+				.put(ENV_FLINK_CONTAINER_ID, containerId);
+		taskExecutorLaunchContext.getEnvironment()
+				.put(ENV_FLINK_NODE_ID, host);
+		return taskExecutorLaunchContext;
+	}
+
+
+	/**
+	 * Creates the launch context, which describes how to bring up a TaskExecutor process in
+	 * an allocated YARN container.
+	 *
+	 * <p>This code is extremely YARN specific and registers all the resources that the TaskExecutor
+	 * needs (such as JAR file, config file, ...) and all environment variables in a YARN
+	 * container launch context. The launch context then ensures that those resources will be
+	 * copied into the containers transient working directory.
+	 *
+	 * @param flinkConfig
+	 *		 The Flink configuration object.
+	 * @param yarnConfig
+	 *		 The YARN configuration object.
+	 * @param env
+	 *		 The environment variables.
+	 * @param tmParams
+	 *		 The TaskExecutor container memory parameters.
+	 * @param taskManagerConfig
+	 *		 The configuration for the TaskExecutors.
+	 * @param workingDirectory
+	 *		 The current application master container's working directory.
+	 * @param taskManagerMainClass
+	 *		 The class with the main method.
+	 * @param log
+	 *		 The logger.
+	 *
+	 * @return The launch context for the TaskManager processes.
+	 *
+	 * @throws Exception Thrown if teh launch context could not be created, for example if
+	 *				   the resources could not be copied.
+	 */
+	private static ContainerLaunchContext createTaskExecutorContext(
+			Configuration flinkConfig,
+			YarnConfiguration yarnConfig,
+			Map<String, String> env,
+			ContaineredTaskManagerParameters tmParams,
+			Configuration taskManagerConfig,
+			String workingDirectory,
+			Class<?> taskManagerMainClass,
+			Logger log) throws Exception {
+
+		// get and validate all relevant variables
+
+		String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
+		
+		String appId = env.get(YarnConfigKeys.ENV_APP_ID);
+
+		String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+
+		String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+
+		String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+		final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
+		log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
+
+		final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+		log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+		final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+		log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath);
+
+		final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
+		log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+		String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
+
+		// obtain a handle to the file system used by YARN
+		final org.apache.hadoop.fs.FileSystem yarnFileSystem;
+		try {
+			yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig);
+		} catch (IOException e) {
+			throw new Exception("Could not access YARN's default file system", e);
+		}
+
+		//register keytab
+		LocalResource keytabResource = null;
+		if(remoteKeytabPath != null) {
+			log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
+			keytabResource = Records.newRecord(LocalResource.class);
+			Path keytabPath = new Path(remoteKeytabPath);
+			Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
+		}
+
+		//To support Yarn Secure Integration Test Scenario
+		LocalResource yarnConfResource = null;
+		LocalResource krb5ConfResource = null;
+		boolean hasKrb5 = false;
+		if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+			log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
+			yarnConfResource = Records.newRecord(LocalResource.class);
+			Path yarnConfPath = new Path(remoteYarnConfPath);
+			Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
+
+			log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
+			krb5ConfResource = Records.newRecord(LocalResource.class);
+			Path krb5ConfPath = new Path(remoteKrb5Path);
+			Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
+
+			hasKrb5 = true;
+		}
+
+		// register Flink Jar with remote HDFS
+		LocalResource flinkJar = Records.newRecord(LocalResource.class);
+		{
+			Path remoteJarPath = new Path(remoteFlinkJarPath);
+			Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar);
+		}
+
+		// register conf with local fs
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+		{
+			// write the TaskManager configuration to a local file
+			final File taskManagerConfigFile =
+					new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
+			log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
+			BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
+
+			Utils.setupLocalResource(yarnFileSystem, appId,
+					new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+			log.info("Prepared local resource for modified yaml: {}", flinkConf);
+		}
+
+		Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
+		taskManagerLocalResources.put("flink.jar", flinkJar);
+		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+		//To support Yarn Secure Integration Test Scenario
+		if(yarnConfResource != null && krb5ConfResource != null) {
+			taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+			taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+		}
+
+		if(keytabResource != null) {
+			taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+		}
+
+		// prepare additional files to be shipped
+		for (String pathStr : shipListString.split(",")) {
+			if (!pathStr.isEmpty()) {
+				LocalResource resource = Records.newRecord(LocalResource.class);
+				Path path = new Path(pathStr);
+				Utils.registerLocalResource(yarnFileSystem, path, resource);
+				taskManagerLocalResources.put(path.getName(), resource);
+			}
+		}
+
+		// now that all resources are prepared, we can create the launch context
+
+		log.info("Creating container launch context for TaskManagers");
+
+		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
+		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
+
+		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
+				flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+				hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
+
+		log.info("Starting TaskManagers with command: " + launchCommand);
+
+		ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+		ctx.setCommands(Collections.singletonList(launchCommand));
+		ctx.setLocalResources(taskManagerLocalResources);
+
+		Map<String, String> containerEnv = new HashMap<>();
+		containerEnv.putAll(tmParams.taskManagerEnv());
+
+		// add YARN classpath, etc to the container environment
+		containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
+		Utils.setupYarnClassPath(yarnConfig, containerEnv);
+
+		containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+		if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+			containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
+			containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
+		}
+
+		ctx.setEnvironment(containerEnv);
+
+		try (DataOutputBuffer dob = new DataOutputBuffer()) {
+			log.debug("Adding security tokens to Task Executor Container launch Context....");
+			UserGroupInformation user = UserGroupInformation.getCurrentUser();
+			Credentials credentials = user.getCredentials();
+			credentials.writeTokenStorageToStream(dob);
+			ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+			ctx.setTokens(securityTokens);
+		}
+		catch (Throwable t) {
+			log.error("Getting current user info failed when trying to launch the container", t);
+		}
+
+		return ctx;
+	}
+}


[07/10] flink git commit: [FLINK-5239] [distributed coordination] RPC service properly unpacks 'InvocationTargetExceptions'

Posted by se...@apache.org.
[FLINK-5239] [distributed coordination] RPC service properly unpacks 'InvocationTargetExceptions'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/887cbb90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/887cbb90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/887cbb90

Branch: refs/heads/flip-6
Commit: 887cbb9095af92e4788c06ba0307cc9db5c5b948
Parents: 44fc46d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 18:49:21 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 16 +++-
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 89 +++++++++++++++++++-
 2 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index fe6b23b..264ba96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -180,8 +181,19 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 				if (rpcMethod.getReturnType().equals(Void.TYPE)) {
 					// No return value to send back
 					rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
-				} else {
-					Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+				}
+				else {
+					final Object result;
+					try {
+						result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+					}
+					catch (InvocationTargetException e) {
+						LOG.trace("Reporting back error thrown in remote procedure {}", rpcMethod, e);
+
+						// tell the sender about the failure
+						getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
+						return;
+					}
 
 					if (result instanceof Future) {
 						final Future<?> future = (Future<?>) result;

http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 760e1a7..c73240c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.TestLogger;
+
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -86,7 +88,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
 
 		try {
-			DummyRpcGateway gateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
+			futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
 
 			fail("The rpc connection resolution should have failed.");
 		} catch (ExecutionException exception) {
@@ -192,6 +194,48 @@ public class AkkaRpcActorTest extends TestLogger {
 		terminationFuture.get();
 	}
 
+	@Test
+	public void testExceptionPropagation() throws Exception {
+		ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint(akkaRpcService);
+		rpcEndpoint.start();
+
+		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+		Future<Integer> result = rpcGateway.doStuff();
+
+		try {
+			result.get(timeout.getSize(), timeout.getUnit());
+			fail("this should fail with an exception");
+		}
+		catch (ExecutionException e) {
+			Throwable cause = e.getCause();
+			assertEquals(RuntimeException.class, cause.getClass());
+			assertEquals("my super specific test exception", cause.getMessage());
+		}
+	}
+
+	@Test
+	public void testExceptionPropagationFuturePiping() throws Exception {
+		ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint(akkaRpcService);
+		rpcEndpoint.start();
+
+		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+		Future<Integer> result = rpcGateway.doStuff();
+
+		try {
+			result.get(timeout.getSize(), timeout.getUnit());
+			fail("this should fail with an exception");
+		}
+		catch (ExecutionException e) {
+			Throwable cause = e.getCause();
+			assertEquals(Exception.class, cause.getClass());
+			assertEquals("some test", cause.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test Actors and Interfaces
+	// ------------------------------------------------------------------------
+
 	private interface DummyRpcGateway extends RpcGateway {
 		Future<Integer> foobar();
 	}
@@ -218,4 +262,47 @@ public class AkkaRpcActorTest extends TestLogger {
 			_foobar = value;
 		}
 	}
+
+	// ------------------------------------------------------------------------
+
+	private interface ExceptionalGateway extends RpcGateway {
+		Future<Integer> doStuff();
+	}
+
+	private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> {
+
+		protected ExceptionalEndpoint(RpcService rpcService) {
+			super(rpcService);
+		}
+
+		@RpcMethod
+		public int doStuff() {
+			throw new RuntimeException("my super specific test exception");
+		}
+	}
+
+	private static class ExceptionalFutureEndpoint extends RpcEndpoint<ExceptionalGateway> {
+
+		protected ExceptionalFutureEndpoint(RpcService rpcService) {
+			super(rpcService);
+		}
+
+		@RpcMethod
+		public Future<Integer> doStuff() {
+			final FlinkCompletableFuture<Integer> future = new FlinkCompletableFuture<>();
+
+			// complete the future slightly in the, well, future...
+			new Thread() {
+				@Override
+				public void run() {
+					try {
+						Thread.sleep(10);
+					} catch (InterruptedException ignored) {}
+					future.completeExceptionally(new Exception("some test"));
+				}
+			}.start();
+
+			return future;
+		}
+	}
 }


[06/10] flink git commit: [FLINK-5190] [runtime] fix ZooKeeperLeaderRetrievalService close the zk client when stopping bug

Posted by se...@apache.org.
[FLINK-5190] [runtime] fix ZooKeeperLeaderRetrievalService close the zk client when stopping bug


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e11ea3f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e11ea3f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e11ea3f5

Branch: refs/heads/flip-6
Commit: e11ea3f52fc1e1676bfdbb8171c804cfa184f2b0
Parents: 887cbb9
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Wed Nov 30 17:02:49 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java    | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e11ea3f5/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
index 4587bad..f74fb1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
@@ -96,7 +96,6 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 		client.getConnectionStateListenable().removeListener(connectionStateListener);
 
 		cache.close();
-		client.close();
 	}
 
 	@Override


[03/10] flink git commit: [FLINK-5238] [minicluster] MiniCluster starts local communication if only one TaskManager is used

Posted by se...@apache.org.
[FLINK-5238] [minicluster] MiniCluster starts local communication if only one TaskManager is used


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62e8e33f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62e8e33f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62e8e33f

Branch: refs/heads/flip-6
Commit: 62e8e33f341e95b70e090a6d0f7d5e75b9c4d4c9
Parents: 6b3283e
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 17:00:25 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |  4 +++-
 .../runtime/taskexecutor/TaskManagerRunner.java | 22 +++++++++++++++-----
 2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1b9f265..29a6e59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -525,6 +525,7 @@ public class MiniCluster {
 			RpcService[] taskManagerRpcServices) throws Exception {
 
 		final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers];
+		final boolean localCommunication = numTaskManagers == 1;
 
 		for (int i = 0; i < numTaskManagers; i++) {
 			taskManagerRunners[i] = new TaskManagerRunner(
@@ -532,7 +533,8 @@ public class MiniCluster {
 				new ResourceID(UUID.randomUUID().toString()),
 				taskManagerRpcServices[i],
 				haServices,
-				metricRegistry);
+				metricRegistry,
+				localCommunication);
 
 			taskManagerRunners[i].start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a18ff40..1145a46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -66,11 +66,22 @@ public class TaskManagerRunner implements FatalErrorHandler {
 	private final TaskExecutor taskManager;
 
 	public TaskManagerRunner(
+			Configuration configuration,
+			ResourceID resourceID,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			MetricRegistry metricRegistry) throws Exception {
+
+		this(configuration, resourceID, rpcService, highAvailabilityServices, metricRegistry, false);
+	}
+
+	public TaskManagerRunner(
 		Configuration configuration,
 		ResourceID resourceID,
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
-		MetricRegistry metricRegistry) throws Exception {
+		MetricRegistry metricRegistry,
+		boolean localCommunicationOnly) throws Exception {
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -80,10 +91,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 		InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
 
-		TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(
-			configuration,
-			remoteAddress,
-			false);
+		TaskManagerServicesConfiguration taskManagerServicesConfiguration = 
+				TaskManagerServicesConfiguration.fromConfiguration(
+						configuration,
+						remoteAddress,
+						localCommunicationOnly);
 
 		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 			taskManagerServicesConfiguration,


[08/10] flink git commit: [FLINK-5140] [JobManager] SlotPool accepts allocation requests while ResourceManager is not connected

Posted by se...@apache.org.
[FLINK-5140] [JobManager] SlotPool accepts allocation requests while ResourceManager is not connected

The requests are kept for a certain time and fulfilled once the ResourceManager is connected.
If no ResourceManager is connected in time, the allocation requests are failed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b3283ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b3283ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b3283ec

Branch: refs/heads/flip-6
Commit: 6b3283ecd980e3db5d5b6cca86885d0dfad6e2cd
Parents: 82c1fcf
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 16:17:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java |  76 ++++++++++++--
 .../flink/runtime/instance/SlotPoolRpcTest.java | 101 +++++++++++++++++++
 .../flink/runtime/instance/SlotPoolTest.java    |  27 -----
 3 files changed, 166 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 65a5c45..1a2adfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -93,8 +93,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 	// ------------------------------------------------------------------------
 
-	private final Object lock = new Object();
-
 	private final JobID jobId;
 
 	private final ProviderAndOwner providerAndOwner;
@@ -111,6 +109,9 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	/** All pending requests waiting for slots */
 	private final HashMap<AllocationID, PendingRequest> pendingRequests;
 
+	/** The requests that are waiting for the resource manager to be connected */
+	private final HashMap<AllocationID, PendingRequest> waitingForResourceManager;
+
 	/** Timeout for request calls to the ResourceManager */
 	private final Time resourceManagerRequestsTimeout;
 
@@ -154,6 +155,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		this.allocatedSlots = new AllocatedSlots();
 		this.availableSlots = new AvailableSlots();
 		this.pendingRequests = new HashMap<>();
+		this.waitingForResourceManager = new HashMap<>();
 
 		this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout);
 	}
@@ -233,6 +235,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) {
 		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
 		this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
+
+		// work on all slots waiting for this connection
+		for (PendingRequest pending : waitingForResourceManager.values()) {
+			requestSlotFromResourceManager(pending.allocationID(), pending.future(), pending.resourceProfile());
+		}
+
+		// all sent off
+		waitingForResourceManager.clear();
 	}
 
 	@RpcMethod
@@ -273,16 +283,27 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 			return FlinkCompletableFuture.completed(slot);
 		}
 
-		// (2) no slot available, and no resource manager connection
+		// the request will be completed by a future
+		final AllocationID allocationID = new AllocationID();
+		final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+
+		// (2) need to request a slot
+
 		if (resourceManagerGateway == null) {
-			return FlinkCompletableFuture.completedExceptionally(
-					new NoResourceAvailableException("not connected to ResourceManager and no slot available"));
-			
+			// no slot available, and no resource manager connection
+			stashRequestWaitingForResourceManager(allocationID, resources, future);
+		} else {
+			// we have a resource manager connection, so let's ask it for more resources
+			requestSlotFromResourceManager(allocationID, future, resources);
 		}
 
-		// (3) we have a resource manager connection, so let's ask it for more resources
-		final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
-		final AllocationID allocationID = new AllocationID();
+		return future;
+	}
+
+	private void requestSlotFromResourceManager(
+			final AllocationID allocationID,
+			final FlinkCompletableFuture<SimpleSlot> future,
+			final ResourceProfile resources) {
 
 		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID);
 
@@ -327,8 +348,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 				return null;
 			}
 		}, getMainThreadExecutor());
-
-		return future;
 	}
 
 	private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) {
@@ -357,6 +376,32 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		}
 	}
 
+	private void stashRequestWaitingForResourceManager(
+			final AllocationID allocationID,
+			final ResourceProfile resources,
+			final FlinkCompletableFuture<SimpleSlot> future) {
+
+		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
+				"Adding as pending request {}",  allocationID);
+
+		waitingForResourceManager.put(allocationID, new PendingRequest(allocationID, future, resources));
+
+		scheduleRunAsync(new Runnable() {
+			@Override
+			public void run() {
+				checkTimeoutRequestWaitingForResourceManager(allocationID);
+			}
+		}, resourceManagerRequestsTimeout);
+	}
+
+	private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
+		PendingRequest request = waitingForResourceManager.remove(allocationID);
+		if (request != null && !request.future().isDone()) {
+			request.future().completeExceptionally(new NoResourceAvailableException(
+					"No slot available and no connection to Resource Manager established."));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Slot releasing & offering
 	// ------------------------------------------------------------------------
@@ -401,6 +446,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
 		final ResourceProfile slotResources = slot.getResourceProfile();
 
+		// try the requests sent to the resource manager first
 		for (PendingRequest request : pendingRequests.values()) {
 			if (slotResources.isMatching(request.resourceProfile())) {
 				pendingRequests.remove(request.allocationID());
@@ -408,6 +454,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 			}
 		}
 
+		// try the requests waiting for a resource manager connection next
+		for (PendingRequest request : waitingForResourceManager.values()) {
+			if (slotResources.isMatching(request.resourceProfile())) {
+				waitingForResourceManager.remove(request.allocationID());
+				return request;
+			}
+		}
+
 		// no request pending, or no request matches
 		return null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
new file mode 100644
index 0000000..89fd22f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SlotPool using a proper RPC setup.
+ */
+public class SlotPoolRpcTest {
+
+	private static RpcService rpcService;
+
+	// ------------------------------------------------------------------------
+	//  setup
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() {
+		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+		rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+	}
+
+	@AfterClass
+	public static  void shutdown() {
+		rpcService.stopService();
+	}
+
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testSlotAllocationNoResourceManager() throws Exception {
+		final JobID jid = new JobID();
+		
+		final SlotPool pool = new SlotPool(
+				rpcService, jid,
+				SystemClock.getInstance(),
+				Time.days(1), Time.days(1),
+				Time.milliseconds(100) // this is the timeout for the request tested here
+		);
+		pool.start(UUID.randomUUID());
+
+		Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+
+		try {
+			future.get(4, TimeUnit.SECONDS);
+			fail("We expected a ExecutionException.");
+		}
+		catch (ExecutionException e) {
+			assertEquals(NoResourceAvailableException.class, e.getCause().getClass());
+		}
+		catch (TimeoutException e) {
+			fail("future timed out rather than being failed");
+		}
+		catch (Exception e) {
+			fail("wrong exception: " + e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 5fa7af3..97457e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
@@ -42,17 +41,13 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -123,28 +118,6 @@ public class SlotPoolTest extends TestLogger {
 	}
 
 	@Test
-	public void testAllocateSlotWithoutResourceManager() throws Exception {
-		slotPool.disconnectResourceManager();
-		Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
-		future.handleAsync(
-			new BiFunction<SimpleSlot, Throwable, Void>() {
-				@Override
-				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
-					assertNull(simpleSlot);
-					assertNotNull(throwable);
-					return null;
-				}
-			},
-			rpcService.getExecutor());
-		try {
-			future.get(1, TimeUnit.SECONDS);
-			fail("We expected a ExecutionException.");
-		} catch (ExecutionException ex) {
-			// we expect the exception
-		}
-	}
-
-	@Test
 	public void testAllocationFulfilledByReturnedSlot() throws Exception {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);


[02/10] flink git commit: [hotfix] Improve logging and thread characteristics for 'EmbeddedNonHaServices'

Posted by se...@apache.org.
[hotfix] Improve logging and thread characteristics for 'EmbeddedNonHaServices'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fc46db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fc46db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fc46db

Branch: refs/heads/flip-6
Commit: 44fc46dba0dcf91ee0f430f1e37f9f28e49ebbc2
Parents: 62e8e33
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 17:43:10 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../runtime/highavailability/EmbeddedNonHaServices.java     | 7 +++++--
 .../highavailability/nonha/AbstractNonHaServices.java       | 9 +++++++--
 .../highavailability/nonha/EmbeddedLeaderService.java       | 5 ++++-
 .../src/test/resources/log4j-test.properties                | 2 +-
 4 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index 523218e..b91cec1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -56,7 +56,10 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
 
 	@Override
 	public void shutdown() throws Exception {
-		super.shutdown();
-		resourceManagerLeaderService.shutdown();
+		try {
+			super.shutdown();
+		} finally {
+			resourceManagerLeaderService.shutdown();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 237727f..474faa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -55,7 +55,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 
 	private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
 
-	private final RunningJobsRegistry runningJobsRegistry;
+	private final NonHaRegistry runningJobsRegistry;
 
 	private boolean shutdown;
 
@@ -167,8 +167,13 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 
 		@Override
 		public Thread newThread(@Nonnull Runnable r) {
-			Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet());
+			Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
+
+			// HA threads should have a very high priority, but not
+			// keep the JVM running by themselves
+			thread.setPriority(Thread.MAX_PRIORITY);
 			thread.setDaemon(true);
+
 			return thread;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
index 84ac551..9fad9be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -225,7 +225,7 @@ public class EmbeddedLeaderService {
 				// check if the confirmation is for the same grant, or whether it is a stale grant 
 				if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
 					final String address = service.contender.getAddress();
-					LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId);
+					LOG.info("Received confirmation of leadership for leader {} , session={}", address, leaderSessionId);
 
 					// mark leadership
 					currentLeaderConfirmed = service;
@@ -271,6 +271,9 @@ public class EmbeddedLeaderService {
 				currentLeaderSessionId = leaderSessionId;
 				currentLeaderProposed = leaderService;
 
+				LOG.info("Proposing leadership to contender {} @ {}",
+						leaderService.contender, leaderService.contender.getAddress());
+
 				notificationExecutor.execute(
 						new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-streaming-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/log4j-test.properties b/flink-streaming-java/src/test/resources/log4j-test.properties
index 881dc06..e7cd3e0 100644
--- a/flink-streaming-java/src/test/resources/log4j-test.properties
+++ b/flink-streaming-java/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
 
 # A1 uses PatternLayout.
 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.A1.layout.ConversionPattern=%-5r [%-38t] %-5p %-60c %x - %m%n


[05/10] flink git commit: [FLINK-5141] [runtime] Add 'waitUntilTaskManagerRegistrationsComplete()' to MiniCluster

Posted by se...@apache.org.
[FLINK-5141] [runtime] Add 'waitUntilTaskManagerRegistrationsComplete()' to MiniCluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82c1fcfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82c1fcfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82c1fcfa

Branch: refs/heads/flip-6
Commit: 82c1fcfa1f34b963f45146830d51b1490b0dc1e3
Parents: c0086b5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 17:35:47 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../leaderelection/LeaderAddressAndId.java      | 73 +++++++++++++++++
 .../flink/runtime/minicluster/MiniCluster.java  | 58 ++++++++++++-
 .../minicluster/MiniClusterJobDispatcher.java   |  2 +-
 .../OneTimeLeaderListenerFuture.java            | 60 ++++++++++++++
 .../resourcemanager/ResourceManager.java        | 11 +++
 .../resourcemanager/ResourceManagerGateway.java |  8 ++
 .../runtime/minicluster/MiniClusterITCase.java  |  8 ++
 .../Flip6LocalStreamEnvironment.java            | 23 +++---
 .../LocalStreamEnvironmentITCase.java           | 81 +++++++++++++++++++
 .../flink/core/testutils/CheckedThread.java     | 85 ++++++++++++++++++++
 10 files changed, 392 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
new file mode 100644
index 0000000..23cd34b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A combination of a leader address and leader id.
+ */
+public class LeaderAddressAndId {
+
+	private final String leaderAddress;
+	private final UUID leaderId;
+
+	public LeaderAddressAndId(String leaderAddress, UUID leaderId) {
+		this.leaderAddress = checkNotNull(leaderAddress);
+		this.leaderId = checkNotNull(leaderId);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public String leaderAddress() {
+		return leaderAddress;
+	}
+
+	public UUID leaderId() {
+		return leaderId;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return 31 * leaderAddress.hashCode()+ leaderId.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o != null && o.getClass() == LeaderAddressAndId.class) {
+			final LeaderAddressAndId that = (LeaderAddressAndId) o;
+			return this.leaderAddress.equals(that.leaderAddress) && this.leaderId.equals(that.leaderId);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "LeaderAddressAndId (" + leaderAddress + " / " + leaderId + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3ede5b5..1b9f265 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -27,11 +27,15 @@ import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -169,6 +173,7 @@ public class MiniCluster {
 			final boolean singleRpc = config.getUseSingleRpcSystem();
 
 			try {
+				LOG.info("Starting Metrics Registry");
 				metricRegistry = createMetricRegistry(configuration);
 
 				RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
@@ -176,10 +181,12 @@ public class MiniCluster {
 				RpcService[] resourceManagerRpcServices = new RpcService[numResourceManagers];
 
 				// bring up all the RPC services
-				if (singleRpc) {
-					// one common RPC for all
-					commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+				LOG.info("Starting RPC Service(s)");
+
+				// we always need the 'commonRpcService' for auxiliary calls
+				commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
 
+				if (singleRpc) {
 					// set that same RPC service for all JobManagers and TaskManagers
 					for (int i = 0; i < numJobManagers; i++) {
 						jobManagerRpcServices[i] = commonRpcService;
@@ -236,7 +243,7 @@ public class MiniCluster {
 						configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
 
 				// bring up the dispatcher that launches JobManagers when jobs submitted
-				LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers);
+				LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers);
 				jobDispatcher = new MiniClusterJobDispatcher(
 						configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
 			}
@@ -357,6 +364,49 @@ public class MiniCluster {
 		}
 	}
 
+	public void waitUntilTaskManagerRegistrationsComplete() throws Exception {
+		LeaderRetrievalService rmMasterListener = null;
+		Future<LeaderAddressAndId> addressAndIdFuture;
+
+		try {
+			synchronized (lock) {
+				checkState(running, "FlinkMiniCluster is not running");
+
+				OneTimeLeaderListenerFuture listenerFuture = new OneTimeLeaderListenerFuture();
+				rmMasterListener = haServices.getResourceManagerLeaderRetriever();
+				rmMasterListener.start(listenerFuture);
+				addressAndIdFuture = listenerFuture.future(); 
+			}
+
+			final LeaderAddressAndId addressAndId = addressAndIdFuture.get();
+
+			final ResourceManagerGateway resourceManager = 
+					commonRpcService.connect(addressAndId.leaderAddress(), ResourceManagerGateway.class).get();
+
+			final int numTaskManagersToWaitFor = taskManagerRunners.length;
+
+			// poll and wait until enough TaskManagers are available
+			while (true) {
+				int numTaskManagersAvailable = 
+						resourceManager.getNumberOfRegisteredTaskManagers(addressAndId.leaderId()).get();
+
+				if (numTaskManagersAvailable >= numTaskManagersToWaitFor) {
+					break;
+				}
+				Thread.sleep(2);
+			}
+		}
+		finally {
+			try {
+				if (rmMasterListener != null) {
+					rmMasterListener.stop();
+				}
+			} catch (Exception e) {
+				LOG.warn("Error shutting down leader listener for ResourceManager");
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  running jobs
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 8ac8eba..7fffaee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -143,7 +143,7 @@ public class MiniClusterJobDispatcher {
 			if (!shutdown) {
 				shutdown = true;
 
-				LOG.info("Shutting down the dispatcher");
+				LOG.info("Shutting down the job dispatcher");
 
 				// in this shutdown code we copy the references to the stack first,
 				// to avoid concurrent modification

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
new file mode 100644
index 0000000..b0157d8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import java.util.UUID;
+
+/**
+ * A leader listener that exposes a future for the first leader notification.  
+ * 
+ * <p>The future can be obtained via the {@link #future()} method.
+ */
+public class OneTimeLeaderListenerFuture implements LeaderRetrievalListener {
+
+	private final FlinkCompletableFuture<LeaderAddressAndId> future;
+
+	public OneTimeLeaderListenerFuture() {
+		this.future = new FlinkCompletableFuture<>();
+	}
+
+	/**
+	 * Gets the future that is completed with the leader address and ID. 
+	 * @return The future.
+	 */
+	public FlinkFuture<LeaderAddressAndId> future() {
+		return future;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+		future.complete(new LeaderAddressAndId(leaderAddress, leaderSessionID));
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		future.completeExceptionally(exception);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 145cc40..76b4a86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -502,6 +503,16 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		shutDownApplication(finalStatus, optionalDiagnostics);
 	}
 
+	@RpcMethod
+	public Integer getNumberOfRegisteredTaskManagers(UUID leaderSessionId) throws LeaderIdMismatchException {
+		if (this.leaderSessionId != null && this.leaderSessionId.equals(leaderSessionId)) {
+			return taskExecutors.size();
+		}
+		else {
+			throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionId);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Testing methods
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 0a37bb9..8235ea7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -122,4 +122,12 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param optionalDiagnostics
 	 */
 	void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
+
+	/**
+	 * Gets the currently registered number of TaskManagers.
+	 * 
+	 * @param leaderSessionId The leader session ID with which to address the ResourceManager.
+	 * @return The future to the number of registered TaskManagers.
+	 */
+	Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index 2cf2d4d..d9a1896 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -33,6 +33,10 @@ import org.junit.Test;
  */
 public class MiniClusterITCase extends TestLogger {
 
+	// ------------------------------------------------------------------------
+	//  Simple Job Running Tests
+	// ------------------------------------------------------------------------
+
 	@Test
 	public void runJobWithSingleRpcService() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
@@ -63,6 +67,10 @@ public class MiniClusterITCase extends TestLogger {
 		executeJob(miniCluster);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
 	private static void executeJob(MiniCluster miniCluster) throws Exception {
 		miniCluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index a0c128e..2007d35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -30,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,8 +67,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 					"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +
 							"or running in a TestEnvironment context.");
 		}
-		
+
 		this.conf = config == null ? new Configuration() : config;
+		setParallelism(1);
 	}
 
 	/**
@@ -85,17 +86,12 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 		StreamGraph streamGraph = getStreamGraph();
 		streamGraph.setJobName(jobName);
 
-		JobGraph jobGraph = streamGraph.getJobGraph();
+		// TODO - temp fix to enforce restarts due to a bug in the allocation protocol
+		streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 5));
 
+		JobGraph jobGraph = streamGraph.getJobGraph();
 		jobGraph.setAllowQueuedScheduling(true);
 
-		// As jira FLINK-5140 described,
-		// we have to set restart strategy to handle NoResourceAvailableException.
-		ExecutionConfig executionConfig = new ExecutionConfig();
-		executionConfig.setRestartStrategy(
-			RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
-		jobGraph.setExecutionConfig(executionConfig);
-
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());
 		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
@@ -105,7 +101,8 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
 
-		// Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up.
+		// Currently we do not reuse slot anymore,
+		// so we need to sum up the parallelism of all vertices
 		int slotsCount = 0;
 		for (JobVertex jobVertex : jobGraph.getVertices()) {
 			slotsCount += jobVertex.getParallelism();
@@ -119,8 +116,10 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 		MiniCluster miniCluster = new MiniCluster(cfg);
 		try {
 			miniCluster.start();
+			miniCluster.waitUntilTaskManagerRegistrationsComplete();
 			return miniCluster.runJobBlocking(jobGraph);
-		} finally {
+		}
+		finally {
 			transformations.clear();
 			miniCluster.shutdown();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
new file mode 100644
index 0000000..a360d0e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class LocalStreamEnvironmentITCase {
+
+	/**
+	 * Test test verifies that the execution environment can be used to execute a
+	 * single job with multiple slots.
+	 */
+	@Test
+	public void testRunIsolatedJob() throws Exception {
+		Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment();
+		assertEquals(1, env.getParallelism());
+
+		addSmallBoundedJob(env, 3);
+		env.execute();
+	}
+
+	/**
+	 * Test test verifies that the execution environment can be used to execute multiple
+	 * bounded streaming jobs after one another.
+	 */
+	@Test
+	public void testMultipleJobsAfterAnother() throws Exception {
+		Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment();
+
+		addSmallBoundedJob(env, 3);
+		env.execute();
+
+		addSmallBoundedJob(env, 5);
+		env.execute();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
+		DataStream<Long> stream = env
+				.generateSequence(1, 100)
+					.setParallelism(parallelism)
+					.slotSharingGroup("group_1");
+
+		stream
+				.filter(new FilterFunction<Long>() {
+					@Override
+					public boolean filter(Long value) {
+						return false;
+					}
+				})
+					.setParallelism(parallelism)
+					.startNewChain()
+					.slotSharingGroup("group_2")
+
+				.print()
+					.setParallelism(parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
new file mode 100644
index 0000000..aedbb5c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+/**
+ * A thread that additionally catches exceptions and offers a joining method that
+ * re-throws the exceptions.
+ * 
+ * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one
+ * needs to extends this class and implement the {@link #go()} method. That method may
+ * throw exceptions.
+ * 
+ * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this
+ * thread via the {@link #sync()} method.
+ */
+public abstract class CheckedThread extends Thread {
+
+	/** The error thrown from the main work method */
+	private volatile Throwable error;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method needs to be overwritten to contain the main work logic.
+	 * It takes the role of {@link Thread#run()}, but should propagate exceptions.
+	 * 
+	 * @throws Exception The exceptions thrown here will be re-thrown in the {@link #sync()} method.
+	 */
+	public abstract void go() throws Exception;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method is final - thread work should go into the {@link #go()} method instead.
+	 */
+	@Override
+	public final void run() {
+		try {
+			go();
+		}
+		catch (Throwable t) {
+			error = t;
+		}
+	}
+
+	/**
+	 * Waits until the thread is completed and checks whether any error occurred during
+	 * the execution.
+	 * 
+	 * <p>This method blocks like {@link #join()}, but performs an additional check for
+	 * exceptions thrown from the {@link #go()} method.
+	 */
+	public void sync() throws Exception {
+		super.join();
+
+		// propagate the error
+		if (error != null) {
+			if (error instanceof Error) {
+				throw (Error) error;
+			}
+			else if (error instanceof Exception) {
+				throw (Exception) error;
+			}
+			else {
+				throw new Exception(error.getMessage(), error);
+			}
+		}
+	}
+}


[09/10] flink git commit: [FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner

Posted by se...@apache.org.
[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8293bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8293bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8293bcb

Branch: refs/heads/flip-6
Commit: e8293bcba588296656ae8425506bd2edf94a70e4
Parents: 8e57fba
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Thu Nov 3 16:24:47 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:44 2016 +0100

----------------------------------------------------------------------
 ...bstractYarnFlinkApplicationMasterRunner.java | 213 +++++++++++++
 .../yarn/YarnFlinkApplicationMasterRunner.java  | 316 +++++++++++++++++++
 2 files changed, 529 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e8293bcb/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
new file mode 100644
index 0000000..923694e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * This class is the executable entry point for the YARN application master.
+ * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public abstract class AbstractYarnFlinkApplicationMasterRunner {
+
+	/** Logger */
+	protected static final Logger LOG = LoggerFactory.getLogger(AbstractYarnFlinkApplicationMasterRunner.class);
+
+	/** The process environment variables */
+	protected static final Map<String, String> ENV = System.getenv();
+
+	/** The exit code returned if the initialization of the application master failed */
+	protected static final int INIT_ERROR_EXIT_CODE = 31;
+
+	/** The host name passed by env */
+	protected String appMasterHostname;
+
+	/**
+	 * The instance entry point for the YARN application master. Obtains user group
+	 * information and calls the main work method {@link #runApplicationMaster(org.apache.flink.configuration.Configuration)} as a
+	 * privileged action.
+	 *
+	 * @param args The command line arguments.
+	 * @return The process exit code.
+	 */
+	protected int run(String[] args) {
+		try {
+			LOG.debug("All environment variables: {}", ENV);
+
+			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+			Preconditions.checkArgument(yarnClientUsername != null, "YARN client user name environment variable {} not set",
+				YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+			final String currDir = ENV.get(Environment.PWD.key());
+			Preconditions.checkArgument(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+			LOG.debug("Current working directory: {}", currDir);
+
+			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
+			LOG.debug("Remote keytab path obtained {}", remoteKeytabPath);
+
+			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+			LOG.info("Remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+			String keytabPath = null;
+			if(remoteKeytabPath != null) {
+				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+				keytabPath = f.getAbsolutePath();
+				LOG.debug("Keytab path: {}", keytabPath);
+			}
+
+			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+					currentUser.getShortUserName(), yarnClientUsername );
+
+			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+			//To support Yarn Secure Integration Test Scenario
+			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+			if(krb5Conf.exists() && krb5Conf.canRead()) {
+				String krb5Path = krb5Conf.getAbsolutePath();
+				LOG.info("KRB5 Conf: {}", krb5Path);
+				org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+				sc.setHadoopConfiguration(conf);
+			}
+
+			// Flink configuration
+			final Map<String, String> dynamicProperties =
+					FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+
+			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+			}
+
+			SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+
+			// Note that we use the "appMasterHostname" given by YARN here, to make sure
+			// we use the hostnames given by YARN consistently throughout akka.
+			// for akka "localhost" and "localhost.localdomain" are different actors.
+			this.appMasterHostname = ENV.get(Environment.NM_HOST.key());
+			Preconditions.checkArgument(appMasterHostname != null,
+					"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
+			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
+
+			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+				@Override
+				public Integer run() {
+					return runApplicationMaster(flinkConfig);
+				}
+			});
+
+		}
+		catch (Throwable t) {
+			// make sure that everything whatever ends up in the log
+			LOG.error("YARN Application Master initialization failed", t);
+			return INIT_ERROR_EXIT_CODE;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Core work method
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The main work method, must run as a privileged action.
+	 *
+	 * @return The return code for the Java process.
+	 */
+	protected abstract int runApplicationMaster(Configuration config);
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	/**
+	 * @param baseDirectory  The working directory
+	 * @param additional Additional parameters
+	 * 
+	 * @return The configuration to be used by the TaskExecutors.
+	 */
+	private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) {
+		LOG.info("Loading config from directory {}.", baseDirectory);
+
+		Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
+
+		configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+
+		// add dynamic properties to JobManager configuration.
+		for (Map.Entry<String, String> property : additional.entrySet()) {
+			configuration.setString(property.getKey(), property.getValue());
+		}
+
+		// override zookeeper namespace with user cli argument (if provided)
+		String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
+		if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
+			configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
+		}
+
+		// if a web monitor shall be started, set the port to random binding
+		if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+			configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		}
+
+		// if the user has set the deprecated YARN-specific config keys, we add the 
+		// corresponding generic config keys instead. that way, later code needs not
+		// deal with deprecated config keys
+
+		BootstrapTools.substituteDeprecatedConfigKey(configuration,
+			ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+
+		BootstrapTools.substituteDeprecatedConfigKey(configuration,
+			ConfigConstants.YARN_HEAP_CUTOFF_MIN,
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
+
+		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+			ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
+			ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+
+		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
+			ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+
+		return configuration;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e8293bcb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
new file mode 100644
index 0000000..e58c77e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+
+/**
+ * This class is the executable entry point for the YARN application master.
+ * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasnagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster}
+ * JobMaster handles Flink job execution, while the YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicationMasterRunner
+		implements OnCompletionActions, FatalErrorHandler {
+
+	/** Logger */
+	protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+	/** The job graph file path */
+	private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+	/** The lock to guard startup / shutdown / manipulation methods */
+	private final Object lock = new Object();
+
+	@GuardedBy("lock")
+	private MetricRegistry metricRegistry;
+
+	@GuardedBy("lock")
+	private HighAvailabilityServices haServices;
+
+	@GuardedBy("lock")
+	private RpcService commonRpcService;
+
+	@GuardedBy("lock")
+	private ResourceManager resourceManager;
+
+	@GuardedBy("lock")
+	private JobManagerRunner jobManagerRunner;
+
+	@GuardedBy("lock")
+	private JobGraph jobGraph;
+
+	// ------------------------------------------------------------------------
+	//  Program entry point
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The entry point for the YARN application master.
+	 *
+	 * @param args The command line arguments.
+	 */
+	public static void main(String[] args) {
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster runner", args);
+		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+		// run and exit with the proper return code
+		int returnCode = new YarnFlinkApplicationMasterRunner().run(args);
+		System.exit(returnCode);
+	}
+
+	@Override
+	protected int runApplicationMaster(Configuration config) {
+
+		try {
+			// ---- (1) create common services
+
+			// try to start the rpc service
+			// using the port range definition from the config.
+			final String amPortRange = config.getString(
+					ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+					ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+
+			synchronized (lock) {
+				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+				metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+				commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
+
+				// ---- (2) init resource manager -------
+				resourceManager = createResourceManager(config);
+
+				// ---- (3) init job master parameters
+				jobManagerRunner = createJobManagerRunner(config);
+
+				// ---- (4) start the resource manager  and job manager runner:
+				resourceManager.start();
+				LOG.debug("YARN Flink Resource Manager started");
+
+				jobManagerRunner.start();
+				LOG.debug("Job Manager Runner started");
+
+				// ---- (5) start the web monitor
+				// TODO: add web monitor
+			}
+
+			// wait for resource manager to finish
+			resourceManager.getTerminationFuture().get();
+			// everything started, we can wait until all is done or the process is killed
+			LOG.info("YARN Application Master finished");
+		}
+		catch (Throwable t) {
+			// make sure that everything whatever ends up in the log
+			LOG.error("YARN Application Master initialization failed", t);
+			shutdown(ApplicationStatus.FAILED, t.getMessage());
+			return INIT_ERROR_EXIT_CODE;
+		}
+
+		return 0;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	protected RpcService createRpcService(
+			Configuration configuration,
+			String bindAddress,
+			String portRange) throws Exception{
+		ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
+		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
+	}
+
+	private ResourceManager createResourceManager(Configuration config) throws ConfigurationException {
+		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
+		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices);
+
+		return new YarnResourceManager(config,
+				ENV,
+				commonRpcService,
+				resourceManagerConfiguration,
+				haServices,
+				slotManagerFactory,
+				metricRegistry,
+				jobLeaderIdService,
+				this);
+	}
+
+	private JobManagerRunner createJobManagerRunner(Configuration config) throws Exception{
+		// first get JobGraph from local resources
+		//TODO: generate the job graph from user's jar
+		jobGraph = loadJobGraph(config);
+
+		// we first need to mark the job as running in the HA services, so that the
+		// JobManager leader will recognize that it as work to do
+		try {
+			haServices.getRunningJobsRegistry().setJobRunning(jobGraph.getJobID());
+		}
+		catch (Throwable t) {
+			throw new JobExecutionException(jobGraph.getJobID(),
+					"Could not register the job at the high-availability services", t);
+		}
+
+		// now the JobManagerRunner
+		return new JobManagerRunner(
+				jobGraph, config,
+				commonRpcService,
+				haServices,
+				this,
+				this);
+	}
+
+	protected void shutdown(ApplicationStatus status, String msg) {
+		synchronized (lock) {
+			if (jobManagerRunner != null) {
+				try {
+					jobManagerRunner.shutdown();
+				} catch (Throwable tt) {
+					LOG.warn("Failed to stop the JobManagerRunner", tt);
+				}
+			}
+			if (resourceManager != null) {
+				try {
+					resourceManager.shutDownCluster(status, msg);
+					resourceManager.shutDown();
+				} catch (Throwable tt) {
+					LOG.warn("Failed to stop the ResourceManager", tt);
+				}
+			}
+			if (commonRpcService != null) {
+				try {
+					commonRpcService.stopService();
+				} catch (Throwable tt) {
+					LOG.error("Error shutting down resource manager rpc service", tt);
+				}
+			}
+			if (haServices != null) {
+				try {
+					haServices.shutdown();
+				} catch (Throwable tt) {
+					LOG.warn("Failed to stop the HA service", tt);
+				}
+			}
+			if (metricRegistry != null) {
+				try {
+					metricRegistry.shutdown();
+				} catch (Throwable tt) {
+					LOG.warn("Failed to stop the metrics registry", tt);
+				}
+			}
+		}
+	}
+
+	private static JobGraph loadJobGraph(Configuration config) throws Exception {
+		JobGraph jg = null;
+		String jobGraphFile = config.getString(JOB_GRAPH_FILE_PATH, "job.graph");
+		if (jobGraphFile != null) {
+			File fp = new File(jobGraphFile);
+			if (fp.isFile()) {
+				FileInputStream input = new FileInputStream(fp);
+				ObjectInputStream obInput = new ObjectInputStream(input);
+				jg = (JobGraph) obInput.readObject();
+				input.close();
+			}
+		}
+		if (jg == null) {
+			throw new Exception("Fail to load job graph " + jobGraphFile);
+		}
+		return jg;
+	}
+
+	//-------------------------------------------------------------------------------------
+	// Fatal error handler
+	//-------------------------------------------------------------------------------------
+
+	@Override
+	public void onFatalError(Throwable exception) {
+		LOG.error("Encountered fatal error.", exception);
+
+		shutdown(ApplicationStatus.FAILED, exception.getMessage());
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// Result and error handling methods
+	//----------------------------------------------------------------------------------------------
+
+	/**
+	 * Job completion notification triggered by JobManager
+	 */
+	@Override
+	public void jobFinished(JobExecutionResult result) {
+		shutdown(ApplicationStatus.SUCCEEDED, null);
+	}
+
+	/**
+	 * Job completion notification triggered by JobManager
+	 */
+	@Override
+	public void jobFailed(Throwable cause) {
+		shutdown(ApplicationStatus.FAILED, cause.getMessage());
+	}
+
+	/**
+	 * Job completion notification triggered by self
+	 */
+	@Override
+	public void jobFinishedByOther() {
+		shutdown(ApplicationStatus.UNKNOWN, null);
+	}
+}


[10/10] flink git commit: [FLINK-4929] [yarn] Implement FLIP-6 YARN TaskExecutor Runner

Posted by se...@apache.org.
[FLINK-4929] [yarn] Implement FLIP-6 YARN TaskExecutor Runner

Summary: Implement FLIP-6 YARN TaskExecutor Runner

Test Plan: NA

Reviewers: biao.liub

Differential Revision: http://phabricator.taobao.net/D6564


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55e94c3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55e94c3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55e94c3c

Branch: refs/heads/flip-6
Commit: 55e94c3c655dc73beaebfd13b83531194f0ae539
Parents: e8293bc
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Wed Nov 23 18:00:07 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:44 2016 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskManagerRunner.java |   6 +
 .../flink/yarn/YarnTaskExecutorRunner.java      | 257 +++++++++++++++++++
 2 files changed, 263 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55e94c3c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 1145a46..3500f6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -151,6 +152,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		}
 	}
 
+	// export the termination future for caller to know it is terminated
+	public Future<Void> getTerminationFuture() {
+		return taskManager.getTerminationFuture();
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  FatalErrorHandler methods
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/55e94c3c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
new file mode 100644
index 0000000..d9912eb
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * This class is the executable entry point for running a TaskExecutor in a YARN container.
+ */
+public class YarnTaskExecutorRunner {
+
+	/** Logger */
+	protected static final Logger LOG = LoggerFactory.getLogger(YarnTaskExecutorRunner.class);
+
+	/** The process environment variables */
+	private static final Map<String, String> ENV = System.getenv();
+
+	/** The exit code returned if the initialization of the yarn task executor runner failed */
+	private static final int INIT_ERROR_EXIT_CODE = 31;
+
+	private MetricRegistry metricRegistry;
+
+	private HighAvailabilityServices haServices;
+
+	private RpcService taskExecutorRpcService;
+
+	private TaskManagerRunner taskManagerRunner;
+
+	// ------------------------------------------------------------------------
+	//  Program entry point
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The entry point for the YARN task executor runner.
+	 *
+	 * @param args The command line arguments.
+	 */
+	public static void main(String[] args) {
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
+		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+		// run and exit with the proper return code
+		int returnCode = new YarnTaskExecutorRunner().run(args);
+		System.exit(returnCode);
+	}
+
+	/**
+	 * The instance entry point for the YARN task executor. Obtains user group
+	 * information and calls the main work method {@link #runTaskExecutor(org.apache.flink.configuration.Configuration)} as a
+	 * privileged action.
+	 *
+	 * @param args The command line arguments.
+	 * @return The process exit code.
+	 */
+	protected int run(String[] args) {
+		try {
+			LOG.debug("All environment variables: {}", ENV);
+
+			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
+			LOG.info("Current working/local Directory: {}", localDirs);
+
+			final String currDir = ENV.get(Environment.PWD.key());
+			LOG.info("Current working Directory: {}", currDir);
+
+			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
+			LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
+
+			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
+			FileSystem.setDefaultScheme(configuration);
+
+			// configure local directory
+			String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+			if (flinkTempDirs == null) {
+				LOG.info("Setting directories for temporary file " + localDirs);
+				configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs);
+			}
+			else {
+				LOG.info("Overriding YARN's temporary file directories with those " +
+						"specified in the Flink config: " + flinkTempDirs);
+			}
+
+			// tell akka to die in case of an error
+			configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
+			String keytabPath = null;
+			if(remoteKeytabPath != null) {
+				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+				keytabPath = f.getAbsolutePath();
+				LOG.info("keytab path: {}", keytabPath);
+			}
+
+			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+					currentUser.getShortUserName(), yarnClientUsername);
+
+			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+			//To support Yarn Secure Integration Test Scenario
+			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+			if(krb5Conf.exists() && krb5Conf.canRead()) {
+				String krb5Path = krb5Conf.getAbsolutePath();
+				LOG.info("KRB5 Conf: {}", krb5Path);
+				org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+				conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+				sc.setHadoopConfiguration(conf);
+			}
+
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+			}
+
+			SecurityContext.install(sc.setFlinkConfiguration(configuration));
+
+			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+				@Override
+				public Integer run() {
+					return runTaskExecutor(configuration);
+				}
+			});
+
+		}
+		catch (Throwable t) {
+			// make sure that everything whatever ends up in the log
+			LOG.error("YARN Application Master initialization failed", t);
+			return INIT_ERROR_EXIT_CODE;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Core work method
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The main work method, must run as a privileged action.
+	 *
+	 * @return The return code for the Java process.
+	 */
+	protected int runTaskExecutor(Configuration config) {
+
+		try {
+			// ---- (1) create common services
+			// first get the ResouceId, resource id is the container id for yarn.
+			final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
+			Preconditions.checkArgument(containerId != null,
+					"ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
+			// use the hostname passed by job manager
+			final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);
+			if (taskExecutorHostname != null) {
+				config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname);
+			}
+
+			ResourceID resourceID = new ResourceID(containerId);
+			LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString());
+
+			haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+			metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+
+			// ---- (2) init task manager runner -------
+			taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices);
+			taskManagerRunner = new TaskManagerRunner(config, resourceID, taskExecutorRpcService, haServices, metricRegistry);
+
+			// ---- (3) start the task manager runner
+			taskManagerRunner.start();
+			LOG.debug("YARN task executor started");
+
+			taskManagerRunner.getTerminationFuture().get();
+			// everything started, we can wait until all is done or the process is killed
+			LOG.info("YARN task manager runner finished");
+			shutdown();
+		}
+		catch (Throwable t) {
+			// make sure that everything whatever ends up in the log
+			LOG.error("YARN task executor initialization failed", t);
+			shutdown();
+			return INIT_ERROR_EXIT_CODE;
+		}
+
+		return 0;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+
+	protected void shutdown() {
+			if (taskExecutorRpcService != null) {
+				try {
+					taskExecutorRpcService.stopService();
+				} catch (Throwable tt) {
+					LOG.error("Error shutting down job master rpc service", tt);
+				}
+			}
+			if (haServices != null) {
+				try {
+					haServices.shutdown();
+				} catch (Throwable tt) {
+					LOG.warn("Failed to stop the HA service", tt);
+				}
+			}
+			if (metricRegistry != null) {
+				try {
+					metricRegistry.shutdown();
+				} catch (Throwable tt) {
+					LOG.warn("Failed to stop the metrics registry", tt);
+				}
+			}
+	}
+
+}


[04/10] flink git commit: [FLINK-5141] [streaming api] Implement LocalStreamEnvironment for new mini cluster.

Posted by se...@apache.org.
[FLINK-5141] [streaming api] Implement LocalStreamEnvironment for new mini cluster.

This closes #2877


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0086b57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0086b57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0086b57

Branch: refs/heads/flip-6
Commit: c0086b57eec63bab627383205eed2ff8636c5394
Parents: 4afcc4a
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Wed Nov 23 17:02:11 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../Flip6LocalStreamEnvironment.java            | 128 +++++++++++++++++++
 1 file changed, 128 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0086b57/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
new file mode 100644
index 0000000..a0c128e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Flip6LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
+ * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
+ * Flink cluster in the background and executes the program on that cluster.
+ *
+ * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
+ * parallelism can be set via {@link #setParallelism(int)}.
+ */
+@Public
+public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
+
+	/** The configuration to use for the mini cluster */
+	private final Configuration conf;
+
+	/**
+	 * Creates a new mini cluster stream environment that uses the default configuration.
+	 */
+	public Flip6LocalStreamEnvironment() {
+		this(null);
+	}
+
+	/**
+	 * Creates a new mini cluster stream environment that configures its local executor with the given configuration.
+	 *
+	 * @param config The configuration used to configure the local executor.
+	 */
+	public Flip6LocalStreamEnvironment(Configuration config) {
+		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+			throw new InvalidProgramException(
+					"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +
+							"or running in a TestEnvironment context.");
+		}
+		
+		this.conf = config == null ? new Configuration() : config;
+	}
+
+	/**
+	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
+	 * specified name.
+	 * 
+	 * @param jobName
+	 *            name of the job
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 */
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		// transform the streaming program into a JobGraph
+		StreamGraph streamGraph = getStreamGraph();
+		streamGraph.setJobName(jobName);
+
+		JobGraph jobGraph = streamGraph.getJobGraph();
+
+		jobGraph.setAllowQueuedScheduling(true);
+
+		// As jira FLINK-5140 described,
+		// we have to set restart strategy to handle NoResourceAvailableException.
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(
+			RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+		jobGraph.setExecutionConfig(executionConfig);
+
+		Configuration configuration = new Configuration();
+		configuration.addAll(jobGraph.getJobConfiguration());
+		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+
+		// add (and override) the settings with what the user defined
+		configuration.addAll(this.conf);
+
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
+
+		// Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up.
+		int slotsCount = 0;
+		for (JobVertex jobVertex : jobGraph.getVertices()) {
+			slotsCount += jobVertex.getParallelism();
+		}
+		cfg.setNumTaskManagerSlots(slotsCount);
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Running job on local embedded Flink mini cluster");
+		}
+
+		MiniCluster miniCluster = new MiniCluster(cfg);
+		try {
+			miniCluster.start();
+			return miniCluster.runJobBlocking(jobGraph);
+		} finally {
+			transformations.clear();
+			miniCluster.shutdown();
+		}
+	}
+}