You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:44 UTC
[11/50] [abbrv] flink git commit: [FLINK-3929] Added Keytab based
Kerberos support to enable secure Flink cluster deployment(addresses HDHS,
Kafka and ZK services)
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index f4c2032..848013c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.security.SecurityContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -59,7 +60,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -341,26 +341,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
@Override
public YarnClusterClient deploy() {
-
try {
-
- UserGroupInformation.setConfiguration(conf);
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- if (!ugi.hasKerberosCredentials()) {
- throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
- "You may use kinit to authenticate and request a TGT from the Kerberos server.");
- }
- return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
- @Override
- public YarnClusterClient run() throws Exception {
- return deployInternal();
- }
- });
- } else {
- return deployInternal();
- }
+ return deployInternal();
} catch (Exception e) {
throw new RuntimeException("Couldn't deploy Yarn cluster", e);
}
@@ -539,9 +521,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- addLibFolderToShipFiles(effectiveShipFiles);
+ //check if there is a JAAS config file
+ File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityContext.JAAS_CONF_FILENAME);
+ if (jaasConfigFile.exists() && jaasConfigFile.isFile()) {
+ effectiveShipFiles.add(jaasConfigFile);
+ }
- final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j);
+ addLibFolderToShipFiles(effectiveShipFiles);
// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
@@ -626,8 +612,53 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(sessionFilesDir, permission); // set permission for path.
+ //To support Yarn Secure Integration Test Scenario
+ //In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
+ //and KRB5 configuration files. We are adding these files as container local resources for the container
+ //applications (JM/TMs) to have proper secure cluster setup
+ Path remoteKrb5Path = null;
+ Path remoteYarnSiteXmlPath = null;
+ boolean hasKrb5 = false;
+ if(System.getenv("IN_TESTS") != null) {
+ String krb5Config = System.getProperty("java.security.krb5.conf");
+ if(krb5Config != null && krb5Config.length() != 0) {
+ File krb5 = new File(krb5Config);
+ LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
+ LocalResource krb5ConfResource = Records.newRecord(LocalResource.class);
+ Path krb5ConfPath = new Path(krb5.getAbsolutePath());
+ remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
+ localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+
+ File f = new File(System.getenv("YARN_CONF_DIR"),Utils.YARN_SITE_FILE_NAME);
+ LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
+ LocalResource yarnConfResource = Records.newRecord(LocalResource.class);
+ Path yarnSitePath = new Path(f.getAbsolutePath());
+ remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory());
+ localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+
+ hasKrb5 = true;
+ }
+ }
+
// setup security tokens
- Utils.setTokensFor(amContainer, paths, conf);
+ LocalResource keytabResource = null;
+ Path remotePathKeytab = null;
+ String keytab = flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+ if(keytab != null) {
+ LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
+ keytabResource = Records.newRecord(LocalResource.class);
+ Path keytabPath = new Path(keytab);
+ remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory());
+ localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+ }
+
+ final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
+
+ if ( UserGroupInformation.isSecurityEnabled() && keytab == null ) {
+ //set tokens only when keytab is not provided
+ LOG.info("Adding delegation token to the AM container..");
+ Utils.setTokensFor(amContainer, paths, conf);
+ }
amContainer.setLocalResources(localResources);
fs.close();
@@ -646,11 +677,25 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
- appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
+ // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
+ appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+ if(keytabResource != null) {
+ appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() );
+ String principal = flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+ appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal );
+ }
+
+ //To support Yarn Secure Integration Test Scenario
+ if(remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
+ appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
+ appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString() );
+ }
+
if(dynamicPropertiesEncoded != null) {
appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
}
@@ -700,6 +745,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
}
YarnApplicationState appState = report.getYarnApplicationState();
+ LOG.debug("Application State: {}", appState);
switch(appState) {
case FAILED:
case FINISHED:
@@ -996,7 +1042,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback, boolean hasLog4j) {
+ protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback,
+ boolean hasLog4j,
+ boolean hasKrb5) {
// ------------------ Prepare Application Master Container ------------------------------
// respect custom JVM options in the YAML file
@@ -1021,6 +1069,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
+ //applicable only for YarnMiniCluster secure test run
+ //krb5.conf file will be available as local resource in JM/TM container
+ if(hasKrb5) {
+ amCommand += " -Djava.security.krb5.conf=krb5.conf";
+ }
+
amCommand += " " + getApplicationMasterClass().getName() + " "
+ " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out"
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 1496d61..94d4582 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -60,6 +60,14 @@ public final class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+ /** Keytab file name populated in YARN container */
+ public static final String KEYTAB_FILE_NAME = "krb5.keytab";
+
+ /** KRB5 file name populated in YARN container for secure IT run */
+ public static final String KRB5_FILE_NAME = "krb5.conf";
+
+ /** Yarn site xml file name populated in YARN container for secure IT run */
+ public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
/**
* See documentation
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 6619633..efb658a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -40,11 +41,11 @@ import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -60,11 +61,10 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
+import java.util.HashMap;
import java.util.UUID;
+import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
@@ -117,7 +117,7 @@ public class YarnApplicationMasterRunner {
/**
* The instance entry point for the YARN application master. Obtains user group
- * information and calls the main work method {@link #runApplicationMaster()} as a
+ * information and calls the main work method {@link #runApplicationMaster(Configuration)} as a
* privileged action.
*
* @param args The command line arguments.
@@ -127,34 +127,66 @@ public class YarnApplicationMasterRunner {
try {
LOG.debug("All environment variables: {}", ENV);
- final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+ final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
require(yarnClientUsername != null, "YARN client user name environment variable {} not set",
- YarnConfigKeys.ENV_CLIENT_USERNAME);
+ YarnConfigKeys.ENV_HADOOP_USER_NAME);
- final UserGroupInformation currentUser;
- try {
- currentUser = UserGroupInformation.getCurrentUser();
- } catch (Throwable t) {
- throw new Exception("Cannot access UserGroupInformation information for current user", t);
+ final String currDir = ENV.get(Environment.PWD.key());
+ require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+ LOG.debug("Current working Directory: {}", currDir);
+
+ final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
+ LOG.debug("remoteKeytabPath obtained {}", remoteKeytabPath);
+
+ final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ LOG.info("remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
+
+ String keytabPath = null;
+ if(remoteKeytabPath != null) {
+ File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+ keytabPath = f.getAbsolutePath();
+ LOG.debug("keytabPath: {}", keytabPath);
}
- LOG.info("YARN daemon runs as user {}. Running Flink Application Master/JobManager as user {}",
- currentUser.getShortUserName(), yarnClientUsername);
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+ currentUser.getShortUserName(), yarnClientUsername );
+
+ SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+ //To support Yarn Secure Integration Test Scenario
+ File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+ if(krb5Conf.exists() && krb5Conf.canRead()) {
+ String krb5Path = krb5Conf.getAbsolutePath();
+ LOG.info("KRB5 Conf: {}", krb5Path);
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ sc.setHadoopConfiguration(conf);
+ }
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+ // Flink configuration
+ final Map<String, String> dynamicProperties =
+ FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+ LOG.debug("YARN dynamic properties: {}", dynamicProperties);
- // transfer all security tokens, for example for authenticated HDFS and HBase access
- for (Token<?> token : currentUser.getTokens()) {
- ugi.addToken(token);
+ final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
+ if(keytabPath != null && remoteKeytabPrincipal != null) {
+ flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+ flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
}
+ flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
- // run the actual work in a secured privileged action
- return ugi.doAs(new PrivilegedAction<Integer>() {
+ SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+
+ return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
@Override
public Integer run() {
- return runApplicationMaster();
+ return runApplicationMaster(flinkConfig);
}
});
+
}
catch (Throwable t) {
// make sure that everything whatever ends up in the log
@@ -172,7 +204,7 @@ public class YarnApplicationMasterRunner {
*
* @return The return code for the Java process.
*/
- protected int runApplicationMaster() {
+ protected int runApplicationMaster(Configuration config) {
ActorSystem actorSystem = null;
WebMonitor webMonitor = null;
@@ -194,12 +226,21 @@ public class YarnApplicationMasterRunner {
LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
- // Flink configuration
- final Map<String, String> dynamicProperties =
- FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
- LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+ //Update keytab and principal path to reflect YARN container path location
+ final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
- final Configuration config = createConfiguration(currDir, dynamicProperties);
+ final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+
+ String keytabPath = null;
+ if(remoteKeytabPath != null) {
+ File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+ keytabPath = f.getAbsolutePath();
+ LOG.info("keytabPath: {}", keytabPath);
+ }
+ if(keytabPath != null && remoteKeytabPrincipal != null) {
+ config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+ config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+ }
// Hadoop/Yarn configuration (loads config data automatically from classpath files)
final YarnConfiguration yarnConfig = new YarnConfiguration();
@@ -523,8 +564,20 @@ public class YarnApplicationMasterRunner {
String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
- String yarnClientUsername = env.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
- require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_USERNAME);
+ String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+ require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+ final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
+ LOG.info("TM:remoteKeytabPath obtained {}", remoteKeytabPath);
+
+ final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ LOG.info("TM:remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
+
+ final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+ LOG.info("TM:remoteYarnConfPath obtained {}", remoteYarnConfPath);
+
+ final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
+ LOG.info("TM:remotekrb5Path obtained {}", remoteKrb5Path);
String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
@@ -537,6 +590,33 @@ public class YarnApplicationMasterRunner {
throw new Exception("Could not access YARN's default file system", e);
}
+ //register keytab
+ LocalResource keytabResource = null;
+ if(remoteKeytabPath != null) {
+ LOG.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
+ keytabResource = Records.newRecord(LocalResource.class);
+ Path keytabPath = new Path(remoteKeytabPath);
+ Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
+ }
+
+ //To support Yarn Secure Integration Test Scenario
+ LocalResource yarnConfResource = null;
+ LocalResource krb5ConfResource = null;
+ boolean hasKrb5 = false;
+ if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+ LOG.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
+ yarnConfResource = Records.newRecord(LocalResource.class);
+ Path yarnConfPath = new Path(remoteYarnConfPath);
+ Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
+
+ LOG.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
+ krb5ConfResource = Records.newRecord(LocalResource.class);
+ Path krb5ConfPath = new Path(remoteKrb5Path);
+ Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
+
+ hasKrb5 = true;
+ }
+
// register Flink Jar with remote HDFS
LocalResource flinkJar = Records.newRecord(LocalResource.class);
{
@@ -563,6 +643,16 @@ public class YarnApplicationMasterRunner {
taskManagerLocalResources.put("flink.jar", flinkJar);
taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+ //To support Yarn Secure Integration Test Scenario
+ if(yarnConfResource != null && krb5ConfResource != null) {
+ taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+ taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+ }
+
+ if(keytabResource != null) {
+ taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+ }
+
// prepare additional files to be shipped
for (String pathStr : shipListString.split(",")) {
if (!pathStr.isEmpty()) {
@@ -582,7 +672,7 @@ public class YarnApplicationMasterRunner {
String launchCommand = BootstrapTools.getTaskManagerShellCommand(
flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
- hasLogback, hasLog4j, taskManagerMainClass);
+ hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
log.info("Starting TaskManagers with command: " + launchCommand);
@@ -597,11 +687,17 @@ public class YarnApplicationMasterRunner {
containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
Utils.setupYarnClassPath(yarnConfig, containerEnv);
- containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, yarnClientUsername);
+ containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+ if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+ containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
+ containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
+ }
ctx.setEnvironment(containerEnv);
try (DataOutputBuffer dob = new DataOutputBuffer()) {
+ LOG.debug("Adding security tokens to Task Manager Container launch Context....");
UserGroupInformation user = UserGroupInformation.getCurrentUser();
Credentials credentials = user.getCredentials();
credentials.writeTokenStorageToStream(dob);
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index b14d7b7..ada241c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -32,7 +32,6 @@ public class YarnConfigKeys {
public final static String ENV_APP_ID = "_APP_ID";
public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
- public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
public static final String ENV_SLOTS = "_SLOTS";
public static final String ENV_DETACHED = "_DETACHED";
public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
@@ -41,8 +40,13 @@ public class YarnConfigKeys {
public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+ public final static String KEYTAB_PATH = "_KEYTAB_PATH";
+ public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+ public final static String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
public static final String ENV_ZOOKEEPER_NAMESPACE = "_ZOOKEEPER_NAMESPACE";
+ public static final String ENV_KRB5_PATH = "_KRB5_PATH";
+ public static final String ENV_YARN_SITE_XML_PATH = "_YARN_SITE_XML_PATH";
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 9638137..c70a30b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -18,22 +18,22 @@
package org.apache.flink.yarn;
+import java.io.File;
import java.io.IOException;
-import java.security.PrivilegedAction;
import java.util.Map;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.slf4j.Logger;
@@ -64,8 +64,18 @@ public class YarnTaskManagerRunner {
// read the environment variables for YARN
final Map<String, String> envs = System.getenv();
- final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+ final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+ LOG.info("Current working/local Directory: {}", localDirs);
+
+ final String currDir = envs.get(Environment.PWD.key());
+ LOG.info("Current working Directory: {}", currDir);
+
+ final String remoteKeytabPath = envs.get(YarnConfigKeys.KEYTAB_PATH);
+ LOG.info("TM: remoteKeytabPath obtained {}", remoteKeytabPath);
+
+ final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
// configure local directory
String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
@@ -78,34 +88,66 @@ public class YarnTaskManagerRunner {
"specified in the Flink config: " + flinkTempDirs);
}
- LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() +
- "' setting user to execute Flink TaskManager to '" + yarnClientUsername + "'");
-
// tell akka to die in case of an error
configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
- for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
- ugi.addToken(toks);
+ String keytabPath = null;
+ if(remoteKeytabPath != null) {
+ File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+ keytabPath = f.getAbsolutePath();
+ LOG.info("keytabPath: {}", keytabPath);
}
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+ currentUser.getShortUserName(), yarnClientUsername );
+
// Infer the resource identifier from the environment variable
String containerID = Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
final ResourceID resourceId = new ResourceID(containerID);
LOG.info("ResourceID assigned for this container: {}", resourceId);
- ugi.doAs(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
- }
- catch (Throwable t) {
- LOG.error("Error while starting the TaskManager", t);
- System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
- }
- return null;
+ try {
+
+ SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+
+ //To support Yarn Secure Integration Test Scenario
+ File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+ if(krb5Conf.exists() && krb5Conf.canRead()) {
+ String krb5Path = krb5Conf.getAbsolutePath();
+ LOG.info("KRB5 Conf: {}", krb5Path);
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ sc.setHadoopConfiguration(conf);
+ }
+
+ if(keytabPath != null && remoteKeytabPrincipal != null) {
+ configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+ configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
}
- });
+ configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
+
+ SecurityContext.install(sc.setFlinkConfiguration(configuration));
+
+ SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
+ }
+ catch (Throwable t) {
+ LOG.error("Error while starting the TaskManager", t);
+ System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+ }
+ return null;
+ }
+ });
+ } catch(Exception e) {
+ LOG.error("Exception occurred while launching Task Manager. Reason: {}", e);
+ throw new RuntimeException(e);
+ }
+
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 8f02a1c..b5364f0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,11 +24,14 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
@@ -460,9 +463,27 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
}
- public static void main(String[] args) {
- FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
- System.exit(cli.run(args));
+ public static void main(final String[] args) {
+ final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session
+
+ String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
+ GlobalConfiguration.loadConfiguration(confDirPath);
+ Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
+ flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, confDirPath);
+ try {
+ SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
+ int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+ @Override
+ public Integer run() {
+ return cli.run(args);
+ }
+ });
+ System.exit(retCode);
+ } catch(Exception e) {
+ e.printStackTrace();
+ LOG.error("Exception Occured. Reason: {}", e);
+ return;
+ }
}
@Override
@@ -523,6 +544,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
try {
return yarnClusterDescriptor.deploy();
} catch (Exception e) {
+ LOG.error("Error while deploying YARN cluster: "+e.getMessage(), e);
throw new RuntimeException("Error deploying the YARN cluster", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 02e868e..5b3148a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,13 @@ under the License.
<jackson.version>2.7.4</jackson.version>
<metrics.version>3.1.0</metrics.version>
<junit.version>4.11</junit.version>
+ <!--
+ Keeping the MiniKDC version fixed instead of taking hadoop version dependency
+ to support testing Kafka, ZK etc., modules that does not have Hadoop dependency
+ Starting Hadoop 3, org.apache.kerby will be used instead of MiniKDC. We may have
+ to revisit the impact at that time.
+ -->
+ <minikdc.version>2.7.2</minikdc.version>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 53379b4..476cee3 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -45,3 +45,6 @@ log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG
# the tests
log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console
log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO, console
+log4j.logger.org.apache.flink.streaming.connectors.kafka=INFO, console
+log4j.logger.org.I0Itec.zkclient=INFO, console
+log4j.logger.org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread=OFF
\ No newline at end of file