You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/20 11:15:57 UTC
[2/3] git commit: [YARN] properly set diagnostics messages on failures
[YARN] properly set diagnostics messages on failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ae32c187
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ae32c187
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ae32c187
Branch: refs/heads/master
Commit: ae32c18769347022e456f93e4a84d907a2a465bd
Parents: 8594905
Author: Robert Metzger <me...@web.de>
Authored: Mon Jun 16 17:56:59 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Aug 20 09:36:24 2014 +0200
----------------------------------------------------------------------
docs/config.md | 5 +
.../apache/flink/yarn/ApplicationMaster.java | 323 -----------
.../main/java/org/apache/flink/yarn/Client.java | 268 ++++++---
.../apache/flink/yarn/ClientMasterControl.java | 128 +++++
.../main/java/org/apache/flink/yarn/Utils.java | 2 +-
.../flink/yarn/YarnTaskManagerRunner.java | 68 ---
.../flink/yarn/appMaster/ApplicationMaster.java | 554 +++++++++++++++++++
.../yarn/appMaster/YarnTaskManagerRunner.java | 68 +++
.../flink/yarn/rpc/ApplicationMasterStatus.java | 93 ++++
.../yarn/rpc/YARNClientMasterProtocol.java | 63 +++
.../client/minicluster/NepheleMiniCluster.java | 2 +-
.../flink/configuration/ConfigConstants.java | 9 +
.../instance/DefaultInstanceManager.java | 2 +-
.../flink/runtime/instance/InstanceManager.java | 4 +-
.../flink/runtime/jobmanager/JobManager.java | 4 +-
.../jobmanager/web/JobmanagerInfoServlet.java | 2 +-
.../jobmanager/web/LogfileInfoServlet.java | 73 ++-
.../runtime/jobmanager/web/WebInfoServer.java | 51 +-
.../scheduler/TestInstanceManager.java | 2 +-
tools/.gitignore | 1 +
20 files changed, 1175 insertions(+), 547 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/docs/config.md
----------------------------------------------------------------------
diff --git a/docs/config.md b/docs/config.md
index ddc579b..4a02b0f 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -249,3 +249,8 @@ sample that the compiler takes for delimited inputs. If the length of a single
sample exceeds this value (possible because of misconfiguration of the parser),
the sampling aborts. This value can be overridden for a specific input with the
input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).
+
+# YARN
+
+- `yarn.am.rpc.port`: The port that is being opened by the Application Master (AM) to
+let the YARN client connect for an RPC serice. (DEFAULT: Port 10245)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
deleted file mode 100644
index 40635dc..0000000
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Writer;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.util.Records;
-
-import com.google.common.base.Preconditions;
-
-public class ApplicationMaster {
-
- private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
-
- private void run() throws Exception {
- //Utils.logFilesInCurrentDirectory(LOG);
- // Initialize clients to ResourceManager and NodeManagers
- Configuration conf = Utils.initializeYarnConfiguration();
- FileSystem fs = FileSystem.get(conf);
- Map<String, String> envs = System.getenv();
- final String currDir = envs.get(Environment.PWD.key());
- final String logDirs = envs.get(Environment.LOG_DIRS.key());
- final String ownHostname = envs.get(Environment.NM_HOST.key());
- final String appId = envs.get(Client.ENV_APP_ID);
- final String clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
- final String applicationMasterHost = envs.get(Environment.NM_HOST.key());
- final String remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
- final String shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
- final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
- final int taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
- final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
- final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
-
- int heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
-
- if(currDir == null) {
- throw new RuntimeException("Current directory unknown");
- }
- if(ownHostname == null) {
- throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
- }
- LOG.info("Working directory "+currDir);
-
- // load Flink configuration.
- Utils.getFlinkConfiguration(currDir);
-
- final String localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
-
- // Update yaml conf -> set jobManager address to this machine's address.
- FileInputStream fis = new FileInputStream(currDir+"/flink-conf.yaml");
- BufferedReader br = new BufferedReader(new InputStreamReader(fis));
- Writer output = new BufferedWriter(new FileWriter(currDir+"/flink-conf-modified.yaml"));
- String line ;
- while ( (line = br.readLine()) != null) {
- if(line.contains(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)) {
- output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
- } else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) {
- output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n");
- } else {
- output.append(line+"\n");
- }
- }
- // just to make sure.
- output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
- output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
- output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
- output.close();
- br.close();
- File newConf = new File(currDir+"/flink-conf-modified.yaml");
- if(!newConf.exists()) {
- LOG.warn("modified yaml does not exist!");
- }
-
- Utils.copyJarContents("resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME,
- ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
-
- JobManager jm;
- {
- String pathToNepheleConfig = currDir+"/flink-conf-modified.yaml";
- String[] args = {"-executionMode","cluster", "-configDir", pathToNepheleConfig};
-
- // start the job manager
- jm = JobManager.initialize( args );
-
- // Start info server for jobmanager
- jm.startInfoServer();
- }
-
- AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
- rmClient.init(conf);
- rmClient.start();
-
- NMClient nmClient = NMClient.createNMClient();
- nmClient.init(conf);
- nmClient.start();
-
- // Register with ResourceManager
- LOG.info("registering ApplicationMaster");
- rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
-
- // Priority for worker containers - priorities are intra-application
- Priority priority = Records.newRecord(Priority.class);
- priority.setPriority(0);
-
- // Resource requirements for worker containers
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(memoryPerTaskManager);
- capability.setVirtualCores(coresPerTaskManager);
-
- // Make container requests to ResourceManager
- for (int i = 0; i < taskManagerCount; ++i) {
- ContainerRequest containerAsk = new ContainerRequest(capability,
- null, null, priority);
- LOG.info("Requesting TaskManager container " + i);
- rmClient.addContainerRequest(containerAsk);
- }
-
- LocalResource flinkJar = Records.newRecord(LocalResource.class);
- LocalResource flinkConf = Records.newRecord(LocalResource.class);
-
- // register Flink Jar with remote HDFS
- final Path remoteJarPath = new Path(remoteFlinkJarPath);
- Utils.registerLocalResource(fs, remoteJarPath, flinkJar);
-
- // register conf with local fs.
- Path remoteConfPath = Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/flink-conf-modified.yaml"), flinkConf, new Path(clientHomeDir));
- LOG.info("Prepared localresource for modified yaml: "+flinkConf);
-
-
- boolean hasLog4j = new File(currDir+"/log4j.properties").exists();
- // prepare the files to ship
- LocalResource[] remoteShipRsc = null;
- String[] remoteShipPaths = shipListString.split(",");
- if(!shipListString.isEmpty()) {
- remoteShipRsc = new LocalResource[remoteShipPaths.length];
- { // scope for i
- int i = 0;
- for(String remoteShipPathStr : remoteShipPaths) {
- if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
- continue;
- }
- remoteShipRsc[i] = Records.newRecord(LocalResource.class);
- Path remoteShipPath = new Path(remoteShipPathStr);
- Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
- i++;
- }
- }
- }
-
- // respect custom JVM options in the YAML file
- final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-
- // Obtain allocated containers and launch
- int allocatedContainers = 0;
- int completedContainers = 0;
- while (allocatedContainers < taskManagerCount) {
- AllocateResponse response = rmClient.allocate(0);
- for (Container container : response.getAllocatedContainers()) {
- LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
- ++allocatedContainers;
-
- // Launch container by create ContainerLaunchContext
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-
- String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
- if(hasLog4j) {
- tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
- }
- tmCommand += " org.apache.flink.yarn.YarnTaskManagerRunner -configDir . "
- + " 1>"
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/taskmanager-stdout.log"
- + " 2>"
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/taskmanager-stderr.log";
- ctx.setCommands(Collections.singletonList(tmCommand));
-
- LOG.info("Starting TM with command="+tmCommand);
-
- // copy resources to the TaskManagers.
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
- localResources.put("flink.jar", flinkJar);
- localResources.put("flink-conf.yaml", flinkConf);
-
- // add ship resources
- if(!shipListString.isEmpty()) {
- Preconditions.checkNotNull(remoteShipRsc);
- for( int i = 0; i < remoteShipPaths.length; i++) {
- localResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
- }
- }
-
-
- ctx.setLocalResources(localResources);
-
- // Setup CLASSPATH for Container (=TaskTracker)
- Map<String, String> containerEnv = new HashMap<String, String>();
- Utils.setupEnv(conf, containerEnv); //add flink.jar to class path.
- containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
-
- ctx.setEnvironment(containerEnv);
-
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
- try {
- Credentials credentials = user.getCredentials();
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
- 0, dob.getLength());
- ctx.setTokens(securityTokens);
- } catch (IOException e) {
- LOG.warn("Getting current user info failed when trying to launch the container"
- + e.getMessage());
- }
-
- LOG.info("Launching container " + allocatedContainers);
- nmClient.startContainer(container, ctx);
- }
- for (ContainerStatus status : response.getCompletedContainersStatuses()) {
- ++completedContainers;
- LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
- LOG.info("Diagnostics "+status.getDiagnostics());
- }
- Thread.sleep(100);
- }
-
- // Now wait for containers to complete
-
- while (completedContainers < taskManagerCount) {
- AllocateResponse response = rmClient.allocate(completedContainers
- / taskManagerCount);
- for (ContainerStatus status : response.getCompletedContainersStatuses()) {
- ++completedContainers;
- LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
- LOG.info("Diagnostics "+status.getDiagnostics());
- }
- Thread.sleep(5000);
- }
- LOG.info("Shutting down JobManager");
- jm.shutdown();
-
- // Un-register with ResourceManager
- rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
-
-
- }
- public static void main(String[] args) throws Exception {
- final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
- LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
- + " user to execute Flink ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
- for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
- ugi.addToken(toks);
- }
- ugi.doAs(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- new ApplicationMaster().run();
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index 6d4c7b5..a2090f1 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -20,13 +20,16 @@
package org.apache.flink.yarn;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.PrintWriter;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -47,6 +50,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol.Message;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -75,22 +79,23 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+
/**
* All classes in this package contain code taken from
* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
* and
* https://github.com/hortonworks/simple-yarn-app
- * and
+ * and
* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
- *
- * The Flink jar is uploaded to HDFS by this client.
+ *
+ * The Flink jar is uploaded to HDFS by this client.
* The application master and all the TaskManager containers get the jar file downloaded
* by YARN into their local fs.
- *
+ *
*/
public class Client {
private static final Log LOG = LogFactory.getLog(Client.class);
-
+
/**
* Command Line argument options
*/
@@ -107,11 +112,11 @@ public class Client {
private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
+ " TaskTrackers)");
-
+
/**
* Constants
*/
- // environment variable names
+ // environment variable names
public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
@@ -120,15 +125,28 @@ public class Client {
public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
-
+ public static final String ENV_AM_PRC_PORT = "_AM_PRC_PORT";
+
private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
-
-
+ /**
+ * Seconds to wait between each status query to the AM.
+ */
+ private static final int CLIENT_POLLING_INTERVALL = 3;
+
private Configuration conf;
+ private YarnClient yarnClient;
+
+ private ClientMasterControl cmc;
+
+ private ApplicationId appId;
+
+ private File addrFile;
+
+ private Path sessionFilesDir;
public void run(String[] args) throws Exception {
-
+
if(UserGroupInformation.isSecurityEnabled()) {
throw new RuntimeException("Flink YARN client does not have security support right now."
+ "File a bug, we will fix it asap");
@@ -149,7 +167,7 @@ public class Client {
options.addOption(QUEUE);
options.addOption(QUERY);
options.addOption(SHIP_PATH);
-
+
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
try {
@@ -159,7 +177,7 @@ public class Client {
printUsage();
System.exit(1);
}
-
+
if (System.getProperty("log4j.configuration") == null) {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
@@ -173,8 +191,8 @@ public class Client {
root.setLevel(Level.INFO);
}
}
-
-
+
+
// Jar Path
Path localJarPath;
if(cmd.hasOption(FLINK_JAR.getOpt())) {
@@ -186,15 +204,15 @@ public class Client {
} else {
localJarPath = new Path("file://"+Client.class.getProtectionDomain().getCodeSource().getLocation().getPath());
}
-
+
if(cmd.hasOption(GEN_CONF.getOpt())) {
LOG.info("Placing default configuration in current directory");
File outFile = generateDefaultConf(localJarPath);
LOG.info("File written to "+outFile.getAbsolutePath());
System.exit(0);
}
-
- // Conf Path
+
+ // Conf Path
Path confPath = null;
String confDirPath = "";
if(cmd.hasOption(FLINK_CONF_DIR.getOpt())) {
@@ -207,7 +225,7 @@ public class Client {
confPath = new Path(confFile.getAbsolutePath());
} else {
System.out.println("No configuration file has been specified");
-
+
// no configuration path given.
// -> see if there is one in the current directory
File currDir = new File(".");
@@ -229,7 +247,7 @@ public class Client {
System.exit(1);
} else if(candidates.length == 1) {
confPath = new Path(candidates[0].toURI());
- }
+ }
}
}
List<File> shipFiles = new ArrayList<File>();
@@ -257,25 +275,25 @@ public class Client {
hasLog4j = true;
}
}
-
+
// queue
String queue = "default";
if(cmd.hasOption(QUEUE.getOpt())) {
queue = cmd.getOptionValue(QUEUE.getOpt());
}
-
+
// JobManager Memory
int jmMemory = 512;
if(cmd.hasOption(JM_MEMORY.getOpt())) {
jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
}
-
+
// Task Managers memory
int tmMemory = 1024;
if(cmd.hasOption(TM_MEMORY.getOpt())) {
tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
}
-
+
// Task Managers vcores
int tmCores = 1;
if(cmd.hasOption(TM_CORES.getOpt())) {
@@ -288,24 +306,24 @@ public class Client {
jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
}
conf = Utils.initializeYarnConfiguration();
-
+
// intialize HDFS
LOG.info("Copy App Master jar from local filesystem and add to local environment");
- // Copy the application master jar to the filesystem
- // Create a local resource to point to the destination jar path
+ // Copy the application master jar to the filesystem
+ // Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(conf);
-
+
if(fs.getScheme().startsWith("file")) {
LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}
-
+
// Create yarnClient
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
-
+
// Query cluster for metrics
if(cmd.hasOption(QUERY.getOpt())) {
showClusterMetrics(yarnClient);
@@ -316,10 +334,10 @@ public class Client {
yarnClient.stop();
System.exit(1);
}
-
+
// TM Count
final int taskManagerCount = Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()));
-
+
System.out.println("Using values:");
System.out.println("\tContainer Count = "+taskManagerCount);
System.out.println("\tJar Path = "+localJarPath.toUri().getPath());
@@ -364,31 +382,31 @@ public class Client {
yarnClient.stop();
System.exit(1);
}
-
+
// respect custom JVM options in the YAML file
final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-
+
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
-
+
String amCommand = "$JAVA_HOME/bin/java"
+ " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
if(hasLog4j) {
amCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
}
- amCommand += " org.apache.flink.yarn.ApplicationMaster" + " "
+ amCommand += " org.apache.flink.yarn.appMaster.ApplicationMaster" + " "
+ " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
amContainer.setCommands(Collections.singletonList(amCommand));
-
+
System.err.println("amCommand="+amCommand);
-
+
// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
final ApplicationId appId = appContext.getApplicationId();
-
+
// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
LocalResource flinkConf = Records.newRecord(LocalResource.class);
@@ -397,8 +415,8 @@ public class Client {
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
localResources.put("flink.jar", appMasterJar);
localResources.put("flink-conf.yaml", flinkConf);
-
-
+
+
// setup security tokens (code from apache storm)
final Path[] paths = new Path[3 + shipFiles.size()];
StringBuffer envShipFileList = new StringBuffer();
@@ -410,7 +428,7 @@ public class Client {
paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
shipLocalPath, shipResources, fs.getHomeDirectory());
localResources.put(shipFile.getName(), shipResources);
-
+
envShipFileList.append(paths[3 + i]);
if(i+1 < shipFiles.size()) {
envShipFileList.append(',');
@@ -419,15 +437,16 @@ public class Client {
paths[0] = remotePathJar;
paths[1] = remotePathConf;
- paths[2] = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+ sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
- fs.setPermission(paths[2], permission); // set permission for path.
+ fs.setPermission(sessionFilesDir, permission); // set permission for path.
Utils.setTokensFor(amContainer, paths, this.conf);
-
-
+
+
amContainer.setLocalResources(localResources);
fs.close();
+ int amRPCPort = GlobalConfiguration.getInteger(ConfigConstants.YARN_AM_PRC_PORT, ConfigConstants.DEFAULT_YARN_AM_RPC_PORT);
// Setup CLASSPATH for ApplicationMaster
Map<String, String> appMasterEnv = new HashMap<String, String>();
Utils.setupEnv(conf, appMasterEnv);
@@ -440,52 +459,35 @@ public class Client {
appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
-
+ appMasterEnv.put(Client.ENV_AM_PRC_PORT, String.valueOf(amRPCPort));
+
amContainer.setEnvironment(appMasterEnv);
-
+
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(jmMemory);
capability.setVirtualCores(1);
-
+
appContext.setApplicationName("Flink"); // application name
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
appContext.setQueue(queue);
-
+
// file that we write into the conf/ dir containing the jobManager address.
- final File addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- LOG.info("Killing the Flink-YARN application.");
- yarnClient.killApplication(appId);
- LOG.info("Deleting files in "+paths[2]);
- FileSystem shutFS = FileSystem.get(conf);
- shutFS.delete(paths[2], true); // delete conf and jar file.
- shutFS.close();
- } catch (Exception e) {
- LOG.warn("Exception while killing the YARN application", e);
- }
- try {
- addrFile.delete();
- } catch (Exception e) {
- LOG.warn("Exception while deleting the jobmanager address file", e);
- }
- LOG.info("YARN Client is shutting down");
- yarnClient.stop();
- }
- });
-
+ addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
+
+
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
boolean told = false;
char[] el = { '/', '|', '\\', '-'};
- int i = 0;
+ int i = 0;
+ int numTaskmanagers = 0;
+
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+
while (appState != YarnApplicationState.FINISHED
&& appState != YarnApplicationState.KILLED
&& appState != YarnApplicationState.FAILED) {
@@ -493,11 +495,15 @@ public class Client {
System.err.println("Flink JobManager is now running on "+appReport.getHost()+":"+jmPort);
System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
// write jobmanager connect information
-
PrintWriter out = new PrintWriter(addrFile);
out.println(appReport.getHost()+":"+jmPort);
out.close();
addrFile.setReadable(true, false); // readable for all.
+
+ // connect RPC service
+ cmc = new ClientMasterControl(new InetSocketAddress(appReport.getHost(), amRPCPort));
+ cmc.start();
+ Runtime.getRuntime().addShutdownHook(new ClientShutdownHook());
told = true;
}
if(!told) {
@@ -507,24 +513,113 @@ public class Client {
}
Thread.sleep(500); // wait for the application to switch to RUNNING
} else {
- Thread.sleep(5000);
+ int newTmCount = cmc.getNumberOfTaskManagers();
+ if(numTaskmanagers != newTmCount) {
+ System.err.println("Number of connected TaskManagers changed to "+newTmCount+" slots available: "+cmc.getNumberOfAvailableSlots());
+ numTaskmanagers = newTmCount;
+ }
+ if(cmc.getFailedStatus()) {
+ System.err.println("The Application Master failed!\nMessages:\n");
+ for(Message m: cmc.getMessages() ) {
+ System.err.println("Message: "+m.text);
+ }
+ System.err.println("Requesting Application Master shutdown");
+ cmc.shutdownAM();
+ System.err.println("Application Master closed.");
+ }
+ for(Message m: cmc.getMessages() ) {
+ System.err.println("Message: "+m.text);
+ }
+
+ // wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
+ long startTime = System.currentTimeMillis();
+ while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
+ && !in.ready()) {
+ Thread.sleep(200);
+ }
+ if (in.ready()) {
+ String command = in.readLine();
+ evalCommand(command);
+ }
+
}
-
+
appReport = yarnClient.getApplicationReport(appId);
appState = appReport.getYarnApplicationState();
}
LOG.info("Application " + appId + " finished with"
- + " state " + appState + " at " + appReport.getFinishTime());
+ + " state " + appState + "and final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+
if(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ) {
LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
}
-
+
+ }
+
+ private void printHelp() {
+ System.err.println("Available commands:\n"
+ + "\t stop : Stop the YARN session\n"
+ // + "\t add n : Add n TaskManagers to the YARN session\n"
+ // + "\t remove n : Remove n TaskManagers to the YARN session\n"
+ + "\t allmsg : Show all messages\n");
}
+ private void evalCommand(String command) {
+ if(command.equals("help")) {
+ printHelp();
+ } else if(command.equals("stop") || command.equals("quit") || command.equals("exit")) {
+ stopSession();
+ System.exit(0);
+ } else if(command.equals("allmsg")) {
+ System.err.println("All messages from the ApplicationMaster:");
+ for(Message m: cmc.getMessages() ) {
+ System.err.println("Message: "+m.text);
+ }
+ } else if(command.startsWith("add")) {
+ String nStr = command.replace("add", "").trim();
+ int n = Integer.valueOf(nStr);
+ System.err.println("Adding "+n+" TaskManagers to the session");
+ cmc.addTaskManagers(n);
+ } else {
+ System.err.println("Unknown command '"+command+"'");
+ printHelp();
+ }
+ }
+
+ private void stopSession() {
+ try {
+ LOG.info("Sending shutdown request to the Application Master");
+ cmc.shutdownAM();
+ yarnClient.killApplication(appId);
+ LOG.info("Deleting files in "+sessionFilesDir );
+ FileSystem shutFS = FileSystem.get(conf);
+ shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
+ shutFS.close();
+ cmc.close();
+ } catch (Exception e) {
+ LOG.warn("Exception while killing the YARN application", e);
+ }
+ try {
+ addrFile.delete();
+ } catch (Exception e) {
+ LOG.warn("Exception while deleting the JobManager address file", e);
+ }
+ LOG.info("YARN Client is shutting down");
+ yarnClient.stop();
+ }
+
+ public class ClientShutdownHook extends Thread {
+ @Override
+ public void run() {
+ stopSession();
+ }
+ }
+
private static class ClusterResourceDescription {
public int totalFreeMemory;
public int containerLimit;
}
+
private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
ClusterResourceDescription crd = new ClusterResourceDescription();
crd.totalFreeMemory = 0;
@@ -549,13 +644,10 @@ public class Client {
Options req = new Options();
req.addOption(CONTAINER);
formatter.printHelp(" ", req);
-
+
formatter.setSyntaxPrefix(" Optional");
Options opt = new Options();
opt.addOption(VERBOSE);
- // opt.addOption(GEN_CONF);
- // opt.addOption(STRATOSPHERE_CONF);
- // opt.addOption(STRATOSPHERE_JAR);
opt.addOption(JM_MEMORY);
opt.addOption(TM_MEMORY);
opt.addOption(TM_CORES);
@@ -604,10 +696,10 @@ public class Client {
System.exit(1);
}
InputStream confStream = jar.getInputStream(jar.getEntry("flink-conf.yaml"));
-
+
if(confStream == null) {
LOG.warn("Given jar file does not contain yaml conf.");
- confStream = this.getClass().getResourceAsStream("flink-conf.yaml");
+ confStream = this.getClass().getResourceAsStream("flink-conf.yaml");
if(confStream == null) {
throw new RuntimeException("Unable to find flink-conf in jar file");
}
@@ -630,4 +722,6 @@ public class Client {
Client c = new Client();
c.run(args);
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
new file mode 100644
index 0000000..44633ea
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.runtime.ipc.RPC;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.yarn.rpc.ApplicationMasterStatus;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol.Message;
+
+
+public class ClientMasterControl extends Thread {
+ private static final Log LOG = LogFactory.getLog(ClientMasterControl.class);
+
+ private InetSocketAddress applicationMasterAddress;
+
+ private ApplicationMasterStatus appMasterStatus;
+ private YARNClientMasterProtocol cmp;
+ private Object lock = new Object();
+ private List<Message> messages;
+ private boolean running = true;
+
+ public ClientMasterControl(InetSocketAddress applicationMasterAddress) {
+ super();
+ this.applicationMasterAddress = applicationMasterAddress;
+ }
+
+ @Override
+ public void run() {
+ try {
+ cmp = RPC.getProxy(YARNClientMasterProtocol.class, applicationMasterAddress, NetUtils.getSocketFactory());
+
+ while(running) {
+ synchronized (lock) {
+ appMasterStatus = cmp.getAppplicationMasterStatus();
+ if(messages != null && appMasterStatus != null &&
+ messages.size() != appMasterStatus.getMessageCount()) {
+ messages = cmp.getMessages();
+ }
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ LOG.warn("Error while getting application status", e);
+ }
+ }
+ RPC.stopProxy(cmp);
+ } catch (IOException e) {
+ LOG.warn("Error while running RPC service", e);
+ }
+
+ }
+
+ public int getNumberOfTaskManagers() {
+ synchronized (lock) {
+ if(appMasterStatus == null) {
+ return 0;
+ }
+ return appMasterStatus.getNumberOfTaskManagers();
+ }
+ }
+
+ public int getNumberOfAvailableSlots() {
+ synchronized (lock) {
+ if(appMasterStatus == null) {
+ return 0;
+ }
+ return appMasterStatus.getNumberOfAvailableSlots();
+ }
+ }
+
+ public boolean getFailedStatus() {
+ synchronized (lock) {
+ if(appMasterStatus == null) {
+ return false;
+ }
+ return appMasterStatus.getFailed();
+ }
+ }
+
+ public boolean shutdownAM() {
+ try {
+ return cmp.shutdownAM().getValue();
+ } catch(Throwable e) {
+ LOG.warn("Error shutting down the application master", e);
+ return false;
+ }
+ }
+
+ public List<Message> getMessages() {
+ if(this.messages == null) {
+ return new ArrayList<Message>();
+ }
+ return this.messages;
+ }
+
+ public void close() {
+ running = false;
+ }
+
+ public void addTaskManagers(int n) {
+ cmp.addTaskManagers(n);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index b72b9bc..bd2cb9e 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -106,7 +106,7 @@ public class Utils {
*
*/
public static int calculateHeapSize(int memory) {
- int heapLimit = (int)((float)memory*0.85);
+ int heapLimit = (int)((float)memory*0.80);
if( (memory - heapLimit) > HEAP_LIMIT_CAP) {
heapLimit = memory-HEAP_LIMIT_CAP;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
deleted file mode 100644
index b541317..0000000
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-
-public class YarnTaskManagerRunner {
-
- private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
-
- public static void main(final String[] args) throws IOException {
- Map<String, String> envs = System.getenv();
- final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
- final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
-
- // configure local directory
- final String[] newArgs = Arrays.copyOf(args, args.length + 2);
- newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR;
- newArgs[newArgs.length-1] = localDirs;
- LOG.info("Setting log path "+localDirs);
- LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
- + " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
- for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
- ugi.addToken(toks);
- }
- ugi.doAs(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- TaskManager.main(newArgs);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
new file mode 100644
index 0000000..2186fca
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.appMaster;
+
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.ipc.RPC;
+import org.apache.flink.runtime.ipc.RPC.Server;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.util.SerializableArrayList;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.yarn.Client;
+import org.apache.flink.yarn.Utils;
+import org.apache.flink.yarn.rpc.ApplicationMasterStatus;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.base.Preconditions;
+
+
+public class ApplicationMaster implements YARNClientMasterProtocol {
+
+ private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+ private final String currDir;
+ private final String logDirs;
+ private final String ownHostname;
+ private final String appId;
+ private final String clientHomeDir;
+ private final String applicationMasterHost;
+ private final String remoteFlinkJarPath;
+ private final String shipListString;
+ private final String yarnClientUsername;
+ private final String rpcPort;
+ private final int taskManagerCount;
+ private final int memoryPerTaskManager;
+ private final int coresPerTaskManager;
+ private final String localWebInterfaceDir;
+ private final Configuration conf;
+
+ /**
+ * File system for interacting with Flink's files such as the jar
+ * and the configuration.
+ */
+ private FileSystem fs;
+
+ /**
+ * The JobManager that is running in the same JVM as this Application Master.
+ */
+ private JobManager jobManager;
+
+ /**
+ * RPC server for letting the YARN client connect to this AM.
+ * This RPC connection is handling application specific requests.
+ */
+ private final Server amRpcServer;
+
+ /**
+ * RPC connecton of the AppMaster to the Resource Manager (YARN master)
+ */
+ private AMRMClient<ContainerRequest> rmClient;
+
+ /**
+ * RPC connection to the Node Manager.
+ */
+ private NMClient nmClient;
+
+ /**
+ * Messages of the AM that the YARN client is showing the user in the YARN session
+ */
+ private List<Message> messages = new SerializableArrayList<Message>();
+
+ /**
+ * Indicates if a log4j config file is being shipped.
+ */
+ private boolean hasLog4j;
+
+ /**
+ * Heap size of TaskManager containers in MB.
+ */
+ private int heapLimit;
+
+ /**
+ * Number of containers that stopped running
+ */
+ private int completedContainers = 0;
+
+ /**
+ * Local resources used by all Task Manager containers.
+ */
+ Map<String, LocalResource> taskManagerLocalResources;
+
+ /**
+ * Flag indicating if the YARN session has failed.
+ * A session failed if all containers stopped or an error occurred.
+ * The ApplicationMaster will not close the RPC connection if it has failed (so
+ * that the client can still retrieve the messages and then shut it down)
+ */
+ private Boolean isFailed = false;
+
+ public ApplicationMaster(Configuration conf) throws IOException {
+ fs = FileSystem.get(conf);
+ Map<String, String> envs = System.getenv();
+ currDir = envs.get(Environment.PWD.key());
+ logDirs = envs.get(Environment.LOG_DIRS.key());
+ ownHostname = envs.get(Environment.NM_HOST.key());
+ appId = envs.get(Client.ENV_APP_ID);
+ clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
+ applicationMasterHost = envs.get(Environment.NM_HOST.key());
+ remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
+ shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
+ yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
+ rpcPort = envs.get(Client.ENV_AM_PRC_PORT);
+ taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
+ memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
+ coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
+ localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
+ this.conf = conf;
+
+ if(currDir == null) {
+ throw new RuntimeException("Current directory unknown");
+ }
+ if(ownHostname == null) {
+ throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
+ }
+ LOG.info("Working directory "+currDir);
+
+ // load Flink configuration.
+ Utils.getFlinkConfiguration(currDir);
+
+ // start AM RPC service
+ amRpcServer = RPC.getServer(this, ownHostname, Integer.valueOf(rpcPort), 2);
+ amRpcServer.start();
+ }
+
+ private void setFailed(boolean failed) {
+ this.isFailed = failed;
+ }
+
+ private void generateConfigurationFile() throws IOException {
+ // Update yaml conf -> set jobManager address to this machine's address.
+ FileInputStream fis = new FileInputStream(currDir+"/flink-conf.yaml");
+ BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+ Writer output = new BufferedWriter(new FileWriter(currDir+"/flink-conf-modified.yaml"));
+ String line ;
+ while ( (line = br.readLine()) != null) {
+ if(line.contains(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)) {
+ output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+ } else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) {
+ output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n");
+ } else {
+ output.append(line+"\n");
+ }
+ }
+ // just to make sure.
+ output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+ output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
+ output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
+ output.close();
+ br.close();
+ File newConf = new File(currDir+"/flink-conf-modified.yaml");
+ if(!newConf.exists()) {
+ LOG.warn("modified yaml does not exist!");
+ }
+ }
+
+ private void startJobManager() throws Exception {
+ Utils.copyJarContents("resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME,
+ ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+ String pathToNepheleConfig = currDir+"/flink-conf-modified.yaml";
+ String[] args = {"-executionMode","cluster", "-configDir", pathToNepheleConfig};
+
+ // start the job manager
+ jobManager = JobManager.initialize( args );
+
+ // Start info server for jobmanager
+ jobManager.startInfoServer();
+ }
+
+ private void setRMClient(AMRMClient<ContainerRequest> rmClient) {
+ this.rmClient = rmClient;
+ }
+
+ private void run() throws Exception {
+ heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
+
+ nmClient = NMClient.createNMClient();
+ nmClient.init(conf);
+ nmClient.start();
+ nmClient.cleanupRunningContainersOnStop(true);
+
+ // Register with ResourceManager
+ LOG.info("Registering ApplicationMaster");
+ rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
+
+ // Priority for worker containers - priorities are intra-application
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(0);
+
+ // Resource requirements for worker containers
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(memoryPerTaskManager);
+ capability.setVirtualCores(coresPerTaskManager);
+
+ // Make container requests to ResourceManager
+ for (int i = 0; i < taskManagerCount; ++i) {
+ ContainerRequest containerAsk = new ContainerRequest(capability,
+ null, null, priority);
+ LOG.info("Requesting TaskManager container " + i);
+ rmClient.addContainerRequest(containerAsk);
+ }
+
+ LocalResource flinkJar = Records.newRecord(LocalResource.class);
+ LocalResource flinkConf = Records.newRecord(LocalResource.class);
+
+ // register Flink Jar with remote HDFS
+ final Path remoteJarPath = new Path(remoteFlinkJarPath);
+ Utils.registerLocalResource(fs, remoteJarPath, flinkJar);
+
+ // register conf with local fs.
+ Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/flink-conf-modified.yaml"), flinkConf, new Path(clientHomeDir));
+ LOG.info("Prepared local resource for modified yaml: "+flinkConf);
+
+
+ hasLog4j = new File(currDir+"/log4j.properties").exists();
+ // prepare the files to ship
+ LocalResource[] remoteShipRsc = null;
+ String[] remoteShipPaths = shipListString.split(",");
+ if(!shipListString.isEmpty()) {
+ remoteShipRsc = new LocalResource[remoteShipPaths.length];
+ { // scope for i
+ int i = 0;
+ for(String remoteShipPathStr : remoteShipPaths) {
+ if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
+ continue;
+ }
+ remoteShipRsc[i] = Records.newRecord(LocalResource.class);
+ Path remoteShipPath = new Path(remoteShipPathStr);
+ Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
+ i++;
+ }
+ }
+ }
+ // copy resources to the TaskManagers.
+ taskManagerLocalResources = new HashMap<String, LocalResource>(2);
+ taskManagerLocalResources.put("flink.jar", flinkJar);
+ taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+ // add ship resources
+ if(!shipListString.isEmpty()) {
+ Preconditions.checkNotNull(remoteShipRsc);
+ for( int i = 0; i < remoteShipPaths.length; i++) {
+ taskManagerLocalResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
+ }
+ }
+ completedContainers = 0;
+
+ // Obtain allocated containers and launch
+ StringBuffer containerDiag = new StringBuffer(); // diagnostics log for the containers.
+ allocateOutstandingContainer(containerDiag);
+ LOG.info("Allocated all initial containers");
+
+ // Now wait for containers to complete
+ while (completedContainers < taskManagerCount) {
+ AllocateResponse response = rmClient.allocate(completedContainers
+ / taskManagerCount);
+ for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+ ++completedContainers;
+ LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
+ LOG.info("Diagnostics "+status.getDiagnostics());
+ logDeadContainer(status, containerDiag);
+ }
+ Thread.sleep(5000);
+ }
+ LOG.info("Shutting down JobManager");
+ jobManager.shutdown();
+
+ // Un-register with ResourceManager
+ final String diagnosticsMessage = "Application Master shut down after all "
+ + "containers finished\n"+containerDiag.toString();
+ LOG.info("Diagnostics message: "+diagnosticsMessage);
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, diagnosticsMessage, "");
+ this.close();
+ LOG.info("Application Master shutdown completed.");
+ }
+
+ /**
+ * Run a Thread to allocate new containers until taskManagerCount
+ * is correct again.
+ */
+ private void allocateOutstandingContainer(StringBuffer containerDiag) throws Exception {
+
+ // respect custom JVM options in the YAML file
+ final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+ int allocatedContainers = 0;
+ while (allocatedContainers < taskManagerCount) {
+ AllocateResponse response = rmClient.allocate(0);
+ for (Container container : response.getAllocatedContainers()) {
+ LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
+ ++allocatedContainers;
+
+ // Launch container by create ContainerLaunchContext
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+ String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
+ if(hasLog4j) {
+ tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
+ }
+ tmCommand += " org.apache.flink.appMaster.YarnTaskManagerRunner -configDir . "
+ + " 1>"
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/taskmanager-stdout.log"
+ + " 2>"
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/taskmanager-stderr.log";
+ ctx.setCommands(Collections.singletonList(tmCommand));
+
+ LOG.info("Starting TM with command="+tmCommand);
+
+ ctx.setLocalResources(taskManagerLocalResources);
+
+ // Setup CLASSPATH for Container (=TaskTracker)
+ Map<String, String> containerEnv = new HashMap<String, String>();
+ Utils.setupEnv(conf, containerEnv); //add flink.jar to class path.
+ containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
+
+ ctx.setEnvironment(containerEnv);
+
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+ try {
+ Credentials credentials = user.getCredentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
+ 0, dob.getLength());
+ ctx.setTokens(securityTokens);
+ } catch (IOException e) {
+ LOG.warn("Getting current user info failed when trying to launch the container", e);
+ }
+
+ LOG.info("Launching container " + allocatedContainers);
+ nmClient.startContainer(container, ctx);
+ messages.add(new Message("Launching new container"));
+ }
+ for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+ ++completedContainers;
+ LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
+ LOG.info("Diagnostics "+status.getDiagnostics());
+ // status.
+ logDeadContainer(status, containerDiag);
+ }
+ Thread.sleep(100);
+ }
+ }
+
+ private void logDeadContainer(ContainerStatus status,
+ StringBuffer containerDiag) {
+ String msg = "Diagnostics for containerId="+status.getContainerId()+
+ " in state="+status.getState()+"\n"+status.getDiagnostics();
+ messages.add(new Message(msg) );
+ containerDiag.append("\n\n");
+ containerDiag.append(msg);
+ }
+
+ @Override
+ public ApplicationMasterStatus getAppplicationMasterStatus() {
+ ApplicationMasterStatus amStatus;
+ if(jobManager == null) {
+ // JM not yet started
+ amStatus = new ApplicationMasterStatus(0, 0 );
+ } else {
+ amStatus = new ApplicationMasterStatus(jobManager.getNumberOfTaskManagers(), jobManager.getAvailableSlots() );
+ }
+ amStatus.setMessageCount(messages.size());
+ amStatus.setFailed(isFailed);
+ return amStatus;
+ }
+
+
+ @Override
+ public BooleanValue shutdownAM() throws Exception {
+ LOG.info("Client requested shutdown of AM");
+ FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
+ String finalMessage = "";
+ if(isFailed) {
+ finalStatus = FinalApplicationStatus.FAILED;
+ finalMessage = "Application Master failed";
+ isFailed = false; // allow a proper shutdown
+ isFailed.notifyAll();
+ }
+ rmClient.unregisterApplicationMaster(finalStatus, finalMessage, "");
+ this.close();
+ return new BooleanValue(true);
+ }
+
+ private void close() throws Exception {
+ nmClient.close();
+ rmClient.close();
+ if(!isFailed) {
+ LOG.warn("Can not close AM RPC connection since this the AM is in failed state");
+ amRpcServer.stop();
+ }
+ }
+
+ @Override
+ public List<Message> getMessages() {
+ return messages;
+ }
+
+ public void addMessage(Message msg) {
+ messages.add(msg);
+ }
+
+ @Override
+ public void addTaskManagers(int n) {
+ throw new RuntimeException("Implement me");
+ }
+
+ /**
+ * Keeps the ApplicationMaster JVM with the Client RPC service running
+ * to allow it retrieving the error message.
+ */
+ protected void keepRPCAlive() {
+ synchronized (isFailed) {
+ while(true) {
+ if(isFailed) {
+ try {
+ isFailed.wait(100);
+ } catch (InterruptedException e) {
+ LOG.warn("Error while waiting until end of failed mode of AM", e);
+ }
+ } else {
+ // end of isFailed mode.
+ break;
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ // execute Application Master using the client's user
+ final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
+ LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+ + " user to execute Flink ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+ for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+ ugi.addToken(toks);
+ }
+ ugi.doAs(new PrivilegedAction<Object>() {
+ @Override
+ public Object run() {
+ AMRMClient<ContainerRequest> rmClient = null;
+ ApplicationMaster am = null;
+ try {
+ Configuration conf = Utils.initializeYarnConfiguration();
+ rmClient = AMRMClient.createAMRMClient();
+ rmClient.init(conf);
+ rmClient.start();
+
+ // run the actual Application Master
+ am = new ApplicationMaster(conf);
+ am.generateConfigurationFile();
+ am.startJobManager();
+ am.setRMClient(rmClient);
+ am.run();
+ } catch (Throwable e) {
+ LOG.fatal("Error while running the application master", e);
+ // the AM is not available. Report error through the unregister function.
+ if(rmClient != null && am == null) {
+ try {
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "Flink YARN Application master"
+ + " stopped unexpectedly with an exception.\n"
+ + StringUtils.stringifyException(e), "");
+ } catch (Exception e1) {
+ LOG.fatal("Unable to fail the application master", e1);
+ }
+ LOG.info("AM unregistered from RM");
+ return null;
+ }
+ if(rmClient == null) {
+ LOG.fatal("Unable to unregister AM since the RM client is not available");
+ }
+ if(am != null) {
+ LOG.info("Writing error into internal message system");
+ am.setFailed(true);
+ am.addMessage(new Message("The application master failed with an exception:\n"
+ + StringUtils.stringifyException(e)));
+ am.keepRPCAlive();
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
new file mode 100644
index 0000000..0b0b1e4
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.appMaster;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.yarn.Client;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+
+
+public class YarnTaskManagerRunner {
+
+ private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
+
+ public static void main(final String[] args) throws IOException {
+ Map<String, String> envs = System.getenv();
+ final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
+ final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+
+ // configure local directory
+ final String[] newArgs = Arrays.copyOf(args, args.length + 2);
+ newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR;
+ newArgs[newArgs.length-1] = localDirs;
+ LOG.info("Setting log path "+localDirs);
+ LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+ + " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+ for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+ ugi.addToken(toks);
+ }
+ ugi.doAs(new PrivilegedAction<Object>() {
+ @Override
+ public Object run() {
+ try {
+ TaskManager.main(newArgs);
+ } catch (Exception e) {
+ LOG.fatal("Error while running the TaskManager", e);
+ }
+ return null;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
new file mode 100644
index 0000000..b2bdf6b
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn.rpc;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * Class holding status information about the ApplicatioMaster.
+ * The client is requesting the AM status regularly from the AM.
+ */
+public class ApplicationMasterStatus implements IOReadableWritable {
+ private int numTaskManagers = 0;
+ private int numSlots = 0;
+ private int messageCount = 0;
+ private boolean failed = false;
+
+
+ public ApplicationMasterStatus() {
+ // for instantiation
+ }
+
+ public ApplicationMasterStatus(int numTaskManagers, int numSlots) {
+ this.numTaskManagers = numTaskManagers;
+ this.numSlots = numSlots;
+ }
+
+ public ApplicationMasterStatus(int numTaskManagers, int numSlots,
+ int messageCount, boolean failed) {
+ this(numTaskManagers, numSlots);
+ this.messageCount = messageCount;
+ this.failed = failed;
+ }
+
+
+ public int getNumberOfTaskManagers() {
+ return numTaskManagers;
+ }
+
+ public int getNumberOfAvailableSlots() {
+ return numSlots;
+ }
+
+ public int getMessageCount() {
+ return messageCount;
+ }
+
+ public void setMessageCount(int messageCount) {
+ this.messageCount = messageCount;
+ }
+
+ public void setFailed(Boolean isFailed) {
+ this.failed = isFailed;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(numTaskManagers);
+ out.writeInt(numSlots);
+ out.writeInt(messageCount);
+ out.writeBoolean(failed);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ numTaskManagers = in.readInt();
+ numSlots = in.readInt();
+ messageCount = in.readInt();
+ failed = in.readBoolean();
+ }
+
+ public boolean getFailed() {
+ return failed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
new file mode 100644
index 0000000..560147c
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.rpc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.protocols.VersionedProtocol;
+import org.apache.flink.types.BooleanValue;
+
+
+/**
+ * Interface describing the methods offered by the RPC service between
+ * the Client and Application Master
+ */
+public interface YARNClientMasterProtocol extends VersionedProtocol {
+
+ public static class Message implements IOReadableWritable {
+ public String text;
+
+ public Message(String msg) {
+ this.text = msg;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeUTF(text);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ text = in.readUTF();
+ }
+ }
+
+ ApplicationMasterStatus getAppplicationMasterStatus();
+
+ BooleanValue shutdownAM() throws Exception;
+
+ List<Message> getMessages();
+
+ void addTaskManagers(int n);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index d614cc2..454c543 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -245,7 +245,7 @@ public class NepheleMiniCluster {
// ------------------------------------------------------------------------
private void waitForJobManagerToBecomeReady(int numTaskManagers) throws InterruptedException {
- while (jobManager.getNumberOfTaskTrackers() < numTaskManagers) {
+ while (jobManager.getNumberOfTaskManagers() < numTaskManagers) {
Thread.sleep(50);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 108db51..2983c63 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -267,6 +267,10 @@ public final class ConfigConstants {
*/
public static final String WEB_ACCESS_FILE_KEY = "webclient.access";
+ // ----------------------------- YARN Client ----------------------------
+
+ public static final String YARN_AM_PRC_PORT = "yarn.am.rpc.port";
+
// ----------------------------- Miscellaneous ----------------------------
/**
@@ -475,6 +479,11 @@ public final class ConfigConstants {
* The default path to the file containing the list of access privileged users and passwords.
*/
public static final String DEFAULT_WEB_ACCESS_FILE_PATH = null;
+
+ // ----------------------------- YARN ----------------------------
+
+ public static final int DEFAULT_YARN_AM_RPC_PORT = 10245;
+
// ----------------------------- LocalExecution ----------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
index 24d2fbc..5c65131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
@@ -382,7 +382,7 @@ public class DefaultInstanceManager implements InstanceManager {
}
@Override
- public int getNumberOfTaskTrackers() {
+ public int getNumberOfTaskManagers() {
return this.registeredHosts.size();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index eed8b51..e8f8cbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -46,9 +46,9 @@ public interface InstanceManager {
Instance getInstanceByName(String name);
- int getNumberOfTaskTrackers();
+ int getNumberOfTaskManagers();
int getNumberOfSlots();
-
+
Map<InstanceConnectionInfo, Instance> getInstances();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 8ef6b58..82f663c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -1180,8 +1180,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return this.archive;
}
- public int getNumberOfTaskTrackers() {
- return this.instanceManager.getNumberOfTaskTrackers();
+ public int getNumberOfTaskManagers() {
+ return this.instanceManager.getNumberOfTaskManagers();
}
public Map<InstanceConnectionInfo, Instance> getInstances() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index 386315f..3ffe5af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -92,7 +92,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), ManagementGroupVertexID.fromHexString(groupvertexId));
}
else if("taskmanagers".equals(req.getParameter("get"))) {
- resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskTrackers() +"}");
+ resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +"}");
}
else if("cancel".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
index d28ea2c..5b09704 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobmanager.web;
import java.io.File;
@@ -31,73 +30,71 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.apache.flink.util.StringUtils;
+import com.google.common.base.Preconditions;
+
public class LogfileInfoServlet extends HttpServlet {
-
+
private static final long serialVersionUID = 1L;
/**
* The log for this class.
*/
private static final Log LOG = LogFactory.getLog(LogfileInfoServlet.class);
-
- private File logDir;
-
- public LogfileInfoServlet(File logDir) {
- this.logDir = logDir;
+
+ private File[] logDirs;
+
+
+ public LogfileInfoServlet(File[] logDirs) {
+ Preconditions.checkNotNull(logDirs, "The given log files are null.");
+ this.logDirs = logDirs;
}
-
+
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try {
if("stdout".equals(req.getParameter("get"))) {
- // Find current stdtout file
- for(File f : logDir.listFiles()) {
- // contains "jobmanager" ".log" and no number in the end ->needs improvement
- if( f.getName().equals("jobmanager-stdout.log") ||
- (f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".out") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) ))
- ) {
-
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType("text/plain ");
- writeFile(resp.getOutputStream(), f);
- break;
- }
- }
+ // Find current stdout file
+ sendFile("jobmanager-stdout.log", resp);
}
else {
// Find current logfile
- for(File f : logDir.listFiles()) {
- // contains "jobmanager" ".log" and no number in the end ->needs improvement
- if( f.getName().equals("jobmanager-stderr.log") ||
- (f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) ))) {
-
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType("text/plain ");
- writeFile(resp.getOutputStream(), f);
- break;
- }
-
- }
+ sendFile("jobmanager-log4j.log", resp);
}
} catch (Throwable t) {
resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- resp.getWriter().print(t.getMessage());
+ resp.getWriter().print("Error opening log files':"+t.getMessage());
if (LOG.isWarnEnabled()) {
LOG.warn(StringUtils.stringifyException(t));
}
}
}
-
+
+ private void sendFile(String fileName, HttpServletResponse resp) throws IOException {
+ for(File logDir: logDirs) {
+ for(File f : logDir.listFiles()) {
+ // contains "jobmanager" ".log" and no number in the end ->needs improvement
+ if( f.getName().equals(fileName) /*||
+ (f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) )) */
+ ) {
+
+ resp.setStatus(HttpServletResponse.SC_OK);
+ resp.setContentType("text/plain");
+ writeFile(resp.getOutputStream(), f);
+ }
+ }
+ }
+ }
private static void writeFile(OutputStream out, File file) throws IOException {
byte[] buf = new byte[4 * 1024]; // 4K buffer
-
+
FileInputStream is = null;
try {
is = new FileInputStream(file);
-
+ out.write(("==== FILE: "+file.toString()+" ====\n").getBytes());
int bytesRead;
while ((bytesRead = is.read(buf)) != -1) {
out.write(buf, 0, bytesRead);
@@ -108,5 +105,5 @@ public class LogfileInfoServlet extends HttpServlet {
}
}
}
-
+
}