You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/11/30 17:31:06 UTC
[1/2] flink git commit: [FLINK-4826] add keytab support to mesos
container
Repository: flink
Updated Branches:
refs/heads/master 8cdb406dc -> fe602eab2
[FLINK-4826] add keytab support to mesos container
This closes #2734.
This closes #2900.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/673c724e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/673c724e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/673c724e
Branch: refs/heads/master
Commit: 673c724e6bdc5c1f410ef8aae1fd1d4c72647f5a
Parents: 8cdb406
Author: Vijay Srinivasaraghavan <vi...@emc.com>
Authored: Thu Oct 13 15:45:35 2016 -0700
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Nov 30 18:32:02 2016 +0100
----------------------------------------------------------------------
.../MesosApplicationMasterRunner.java | 103 ++++++++++++++-----
.../clusterframework/MesosConfigKeys.java | 5 +-
.../MesosTaskManagerRunner.java | 67 +++++++-----
.../flink/runtime/security/SecurityContext.java | 1 +
4 files changed, 124 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index c35fa82..bca179f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -24,6 +24,7 @@ import akka.actor.Address;
import akka.actor.Props;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Hardware;
@@ -66,7 +68,6 @@ import java.io.File;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
-import java.security.PrivilegedAction;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -124,7 +125,7 @@ public class MesosApplicationMasterRunner {
/**
* The instance entry point for the Mesos AppMaster. Obtains user group
- * information and calls the main work method {@link #runPrivileged()} as a
+ * information and calls the main work method {@link #runPrivileged(Configuration)} as a
* privileged action.
*
* @param args The command line arguments.
@@ -134,20 +135,27 @@ public class MesosApplicationMasterRunner {
try {
LOG.debug("All environment variables: {}", ENV);
- final UserGroupInformation currentUser;
- try {
- currentUser = UserGroupInformation.getCurrentUser();
- } catch (Throwable t) {
- throw new Exception("Cannot access UserGroupInformation information for current user", t);
- }
+ final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
+ checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+
+ // Flink configuration
+ final Configuration dynamicProperties =
+ FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+ LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+
+ final Configuration configuration = createConfiguration(workingDir, dynamicProperties);
+
+ SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+ sc.setFlinkConfiguration(configuration);
+ sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+ SecurityContext.install(sc);
- LOG.info("Running Flink as user {}", currentUser.getShortUserName());
+ LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName());
- // run the actual work in a secured privileged action
- return currentUser.doAs(new PrivilegedAction<Integer>() {
+ return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
@Override
public Integer run() {
- return runPrivileged();
+ return runPrivileged(configuration);
}
});
}
@@ -167,7 +175,7 @@ public class MesosApplicationMasterRunner {
*
* @return The return code for the Java process.
*/
- protected int runPrivileged() {
+ protected int runPrivileged(Configuration config) {
ActorSystem actorSystem = null;
WebMonitor webMonitor = null;
@@ -179,7 +187,6 @@ public class MesosApplicationMasterRunner {
// configuration problem occurs
final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
- checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
@@ -197,13 +204,6 @@ public class MesosApplicationMasterRunner {
return INIT_ERROR_EXIT_CODE;
}
- // Flink configuration
- final Configuration dynamicProperties =
- FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
- LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
-
- final Configuration config = createConfiguration(workingDir, dynamicProperties);
-
// Mesos configuration
final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
@@ -456,9 +456,7 @@ public class MesosApplicationMasterRunner {
private static Configuration createConfiguration(String baseDirectory, Configuration additional) {
LOG.info("Loading config from directory {}", baseDirectory);
- Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
-
- configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+ Configuration configuration = GlobalConfiguration.loadConfiguration();
// add dynamic properties to JobManager configuration.
configuration.addAll(additional);
@@ -587,9 +585,6 @@ public class MesosApplicationMasterRunner {
String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
checkState(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
- String clientUsername = env.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
- checkState(clientUsername != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_USERNAME);
-
String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH);
checkState(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH);
@@ -597,6 +592,45 @@ public class MesosApplicationMasterRunner {
final File flinkJarFile = new File(workingDirectory, "flink.jar");
cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true));
+ String hadoopConfDir = env.get("HADOOP_CONF_DIR");
+ LOG.debug("ENV: hadoopConfDir = {}", hadoopConfDir);
+
+ //upload Hadoop configurations to artifact server
+ boolean hadoopConf = false;
+ if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
+ File source = new File(hadoopConfDir);
+ if(source.exists() && source.isDirectory()) {
+ hadoopConf = true;
+ File[] fileList = source.listFiles();
+ for(File file: fileList) {
+ if(file.getName().equals("core-site.xml") || file.getName().equals("hdfs-site.xml")) {
+ LOG.debug("Adding local file: [{}] to artifact server", file);
+ File f = new File(hadoopConfDir, file.getName());
+ cmd.addUris(uri(artifactServer.addFile(f, file.getName()), true));
+ }
+ }
+ }
+ }
+
+ //upload keytab to the artifact server
+ String keytabFileName = null;
+ String keytab = flinkConfig.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+ if(keytab != null) {
+ File source = new File(keytab);
+ if(source.exists()) {
+ LOG.debug("Adding keytab file: [{}] to artifact server", source);
+ keytabFileName = source.getName();
+ cmd.addUris(uri(artifactServer.addFile(source, source.getName()), true));
+ }
+ }
+
+ String principal = flinkConfig.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+ if(keytabFileName != null && principal != null) {
+ //reset the configurations since we will use in-memory reference from within the TM instance
+ taskManagerConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,"");
+ taskManagerConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,"");
+ }
+
// register the TaskManager configuration
final File taskManagerConfigFile =
new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
@@ -630,7 +664,20 @@ public class MesosApplicationMasterRunner {
envBuilder.addVariables(variable(entry.getKey(), entry.getValue()));
}
envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString));
- envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLIENT_USERNAME, clientUsername));
+
+ //add hadoop config directory to the environment
+ if(hadoopConf) {
+ envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_CONF_DIR, "."));
+ }
+
+ //add keytab and principal to environment
+ if(keytabFileName != null && principal != null) {
+ envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB, keytabFileName));
+ envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL, principal));
+ }
+
+ envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_USER_NAME,
+ UserGroupInformation.getCurrentUser().getUserName()));
cmd.setEnvironment(envBuilder);
http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index 9413c68..bc6dde4 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -30,7 +30,7 @@ public class MesosConfigKeys {
public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
public static final String ENV_SLOTS = "_SLOTS";
public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
- public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+ public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
@@ -38,6 +38,9 @@ public class MesosConfigKeys {
public static final String ENV_CLASSPATH = "CLASSPATH";
public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX";
public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID";
+ public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
+ public static final String ENV_KEYTAB = "_KEYTAB_FILE";
+ public static final String ENV_KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
/** Private constructor to prevent instantiation */
private MesosConfigKeys() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index ddc2097..d7544a0 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,14 +18,16 @@
package org.apache.flink.mesos.runtime.clusterframework;
+import java.io.File;
import java.io.IOException;
-import java.security.PrivilegedAction;
import java.util.Map;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -33,8 +35,6 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +73,7 @@ public class MesosTaskManagerRunner {
// read the environment variables
final Map<String, String> envs = System.getenv();
- final String effectiveUsername = envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+ final String effectiveUsername = envs.get(MesosConfigKeys.ENV_HADOOP_USER_NAME);
final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
// configure local directory
@@ -87,34 +87,55 @@ public class MesosTaskManagerRunner {
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
}
- LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
- UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
+ final String keytab = envs.get(MesosConfigKeys.ENV_KEYTAB);
+ LOG.info("Keytab file:{}", keytab);
- // tell akka to die in case of an error
- configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+ final String principal = envs.get(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL);
+ LOG.info("Keytab principal:{}", principal);
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(effectiveUsername);
- for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
- ugi.addToken(toks);
+ if(keytab != null && keytab.length() != 0) {
+ File f = new File(".", keytab);
+ if(!f.exists()) {
+ LOG.error("Could not locate keytab file:[" + keytab + "]");
+ System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+ }
+ configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab);
+ configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, principal);
}
+ // tell akka to die in case of an error
+ configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
// Infer the resource identifier from the environment variable
String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
final ResourceID resourceId = new ResourceID(containerID);
LOG.info("ResourceID assigned for this container: {}", resourceId);
- ugi.doAs(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
+ String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
+ LOG.info("hadoopConfDir: {}", hadoopConfDir);
+
+ SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+ sc.setFlinkConfiguration(configuration);
+ if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
+ sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+ }
+
+ try {
+ SecurityContext.install(sc);
+ LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
+ UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
+ SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Object>() {
+ @Override
+ public Object run() throws Exception {
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
+ return null;
}
- catch (Throwable t) {
- LOG.error("Error while starting the TaskManager", t);
- System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
- }
- return null;
- }
- });
+ });
+ }
+ catch (Throwable t) {
+ LOG.error("Error while starting the TaskManager", t);
+ System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+ }
+
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 02892d3..4b7c731 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.security;
import java.util.concurrent.Callable;
+ LOG.info("Hadoop security is enabled");
/**
* A security context with may be required to run a Callable.
[2/2] flink git commit: [FLINK-4918] add SSL handler to artifact
server
Posted by mx...@apache.org.
[FLINK-4918] add SSL handler to artifact server
This closes #2734.
This closes #2900.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe602eab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe602eab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe602eab
Branch: refs/heads/master
Commit: fe602eab2e2772cfe663baa7b4a3df8afe138da3
Parents: 673c724
Author: Vijay Srinivasaraghavan <vi...@emc.com>
Authored: Mon Oct 31 09:54:03 2016 -0700
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Nov 30 18:32:05 2016 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 8 ++++
.../MesosApplicationMasterRunner.java | 14 +++----
.../MesosTaskManagerRunner.java | 14 +++----
.../flink/mesos/util/MesosArtifactServer.java | 42 ++++++++++++++++++--
.../flink/runtime/security/SecurityContext.java | 1 -
5 files changed, 61 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index dd17f00..6bc5e2e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -498,6 +498,11 @@ public final class ConfigConstants {
"mesos.resourcemanager.tasks.container.image.name";
/**
+ * Config parameter to override SSL support for the Artifact Server
+ */
+ public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = "mesos.resourcemanager.artifactserver.ssl.enabled";
+
+ /**
* The type of container to use for task managers. Valid values are
* {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS} or
* {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER}.
@@ -1181,6 +1186,9 @@ public final class ConfigConstants {
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
+ /** Default value to override SSL support for the Artifact Server */
+ public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
+
public static final String DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE = "mesos";
// ------------------------ File System Behavior ------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index bca179f..ef58250 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Hardware;
@@ -70,6 +70,7 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -145,16 +146,15 @@ public class MesosApplicationMasterRunner {
final Configuration configuration = createConfiguration(workingDir, dynamicProperties);
- SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
- sc.setFlinkConfiguration(configuration);
+ SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
- SecurityContext.install(sc);
+ SecurityUtils.install(sc);
LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName());
- return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+ return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
@Override
- public Integer run() {
+ public Integer call() {
return runPrivileged(configuration);
}
});
@@ -279,7 +279,7 @@ public class MesosApplicationMasterRunner {
LOG.debug("Starting Artifact Server");
final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
- artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort);
+ artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort, config);
// ----------------- (3) Generate the configuration for the TaskManagers -------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index d7544a0..5100deb 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -21,13 +21,14 @@ package org.apache.flink.mesos.runtime.clusterframework;
import java.io.File;
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.Callable;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -114,19 +115,18 @@ public class MesosTaskManagerRunner {
String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
LOG.info("hadoopConfDir: {}", hadoopConfDir);
- SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
- sc.setFlinkConfiguration(configuration);
+ SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
}
try {
- SecurityContext.install(sc);
+ SecurityUtils.install(sc);
LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
- SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Object>() {
+ SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
@Override
- public Object run() throws Exception {
+ public Object call() throws Exception {
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
return null;
}
@@ -138,4 +138,4 @@ public class MesosTaskManagerRunner {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 6547cb3..fbf61ac 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -43,13 +43,20 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Routed;
import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.net.SSLUtils;
import org.jets3t.service.utils.Mimetypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
@@ -87,11 +94,31 @@ public class MesosArtifactServer {
private URL baseURL;
- public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception {
+ private final SSLContext serverSSLContext;
+
+ public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort, Configuration config)
+ throws Exception {
if (configuredPort < 0 || configuredPort > 0xFFFF) {
throw new IllegalArgumentException("File server port is invalid: " + configuredPort);
}
+ // Config to enable https access to the web-ui
+ boolean enableSSL = config.getBoolean(
+ ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
+ ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
+ SSLUtils.getSSLEnabled(config);
+
+ if (enableSSL) {
+ LOG.info("Enabling ssl for the artifact server");
+ try {
+ serverSSLContext = SSLUtils.createSSLServerContext(config);
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize SSLContext for the artifact server", e);
+ }
+ } else {
+ serverSSLContext = null;
+ }
+
router = new Router();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -100,6 +127,13 @@ public class MesosArtifactServer {
protected void initChannel(SocketChannel ch) {
Handler handler = new Handler(router);
+ // SSL should be the first handler in the pipeline
+ if (serverSSLContext != null) {
+ SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+ sslEngine.setUseClientMode(false);
+ ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+ }
+
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(handler.name(), handler)
@@ -123,9 +157,11 @@ public class MesosArtifactServer {
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();
- baseURL = new URL("http", serverHostname, port, "/" + sessionID + "/");
+ String httpProtocol = (serverSSLContext != null) ? "https": "http";
+
+ baseURL = new URL(httpProtocol, serverHostname, port, "/" + sessionID + "/");
- LOG.info("Mesos artifact server listening at {}:{}", address, port);
+ LOG.info("Mesos Artifact Server Base URL: {}, listening at {}:{}", baseURL, address, port);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 4b7c731..02892d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.security;
import java.util.concurrent.Callable;
- LOG.info("Hadoop security is enabled");
/**
* A security context with may be required to run a Callable.