You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/11/30 17:31:06 UTC

[1/2] flink git commit: [FLINK-4826] add keytab support to mesos container

Repository: flink
Updated Branches:
  refs/heads/master 8cdb406dc -> fe602eab2


[FLINK-4826] add keytab support to mesos container

This closes #2734.
This closes #2900.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/673c724e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/673c724e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/673c724e

Branch: refs/heads/master
Commit: 673c724e6bdc5c1f410ef8aae1fd1d4c72647f5a
Parents: 8cdb406
Author: Vijay Srinivasaraghavan <vi...@emc.com>
Authored: Thu Oct 13 15:45:35 2016 -0700
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Nov 30 18:32:02 2016 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           | 103 ++++++++++++++-----
 .../clusterframework/MesosConfigKeys.java       |   5 +-
 .../MesosTaskManagerRunner.java                 |  67 +++++++-----
 .../flink/runtime/security/SecurityContext.java |   1 +
 4 files changed, 124 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index c35fa82..bca179f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -24,6 +24,7 @@ import akka.actor.Address;
 import akka.actor.Props;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.Hardware;
@@ -66,7 +68,6 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
-import java.security.PrivilegedAction;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -124,7 +125,7 @@ public class MesosApplicationMasterRunner {
 
 	/**
 	 * The instance entry point for the Mesos AppMaster. Obtains user group
-	 * information and calls the main work method {@link #runPrivileged()} as a
+	 * information and calls the main work method {@link #runPrivileged(Configuration)} as a
 	 * privileged action.
 	 *
 	 * @param args The command line arguments.
@@ -134,20 +135,27 @@ public class MesosApplicationMasterRunner {
 		try {
 			LOG.debug("All environment variables: {}", ENV);
 
-			final UserGroupInformation currentUser;
-			try {
-				currentUser = UserGroupInformation.getCurrentUser();
-			} catch (Throwable t) {
-				throw new Exception("Cannot access UserGroupInformation information for current user", t);
-			}
+			final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
+			checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+
+			// Flink configuration
+			final Configuration dynamicProperties =
+					FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+
+			final Configuration configuration = createConfiguration(workingDir, dynamicProperties);
+
+			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+			sc.setFlinkConfiguration(configuration);
+			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+			SecurityContext.install(sc);
 
-			LOG.info("Running Flink as user {}", currentUser.getShortUserName());
+			LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName());
 
-			// run the actual work in a secured privileged action
-			return currentUser.doAs(new PrivilegedAction<Integer>() {
+			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
 				@Override
 				public Integer run() {
-					return runPrivileged();
+					return runPrivileged(configuration);
 				}
 			});
 		}
@@ -167,7 +175,7 @@ public class MesosApplicationMasterRunner {
 	 *
 	 * @return The return code for the Java process.
 	 */
-	protected int runPrivileged() {
+	protected int runPrivileged(Configuration config) {
 
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
@@ -179,7 +187,6 @@ public class MesosApplicationMasterRunner {
 		// configuration problem occurs
 
 		final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-		checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
 
 		final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
 		checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
@@ -197,13 +204,6 @@ public class MesosApplicationMasterRunner {
 			return INIT_ERROR_EXIT_CODE;
 		}
 
-		// Flink configuration
-		final Configuration dynamicProperties =
-			FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
-		LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
-
-		final Configuration config = createConfiguration(workingDir, dynamicProperties);
-
 		// Mesos configuration
 		final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
 
@@ -456,9 +456,7 @@ public class MesosApplicationMasterRunner {
 	private static Configuration createConfiguration(String baseDirectory, Configuration additional) {
 		LOG.info("Loading config from directory {}", baseDirectory);
 
-		Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
-
-		configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+		Configuration configuration = GlobalConfiguration.loadConfiguration();
 
 		// add dynamic properties to JobManager configuration.
 		configuration.addAll(additional);
@@ -587,9 +585,6 @@ public class MesosApplicationMasterRunner {
 		String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
 		checkState(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
 
-		String clientUsername = env.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
-		checkState(clientUsername != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_USERNAME);
-
 		String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH);
 		checkState(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH);
 
@@ -597,6 +592,45 @@ public class MesosApplicationMasterRunner {
 		final File flinkJarFile = new File(workingDirectory, "flink.jar");
 		cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true));
 
+		String hadoopConfDir = env.get("HADOOP_CONF_DIR");
+		LOG.debug("ENV: hadoopConfDir = {}", hadoopConfDir);
+
+		//upload Hadoop configurations to artifact server
+		boolean hadoopConf = false;
+		if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
+			File source = new File(hadoopConfDir);
+			if(source.exists() && source.isDirectory()) {
+				hadoopConf = true;
+				File[] fileList = source.listFiles();
+				for(File file: fileList) {
+					if(file.getName().equals("core-site.xml") || file.getName().equals("hdfs-site.xml")) {
+						LOG.debug("Adding local file: [{}] to artifact server", file);
+						File f = new File(hadoopConfDir, file.getName());
+						cmd.addUris(uri(artifactServer.addFile(f, file.getName()), true));
+					}
+				}
+			}
+		}
+
+		//upload keytab to the artifact server
+		String keytabFileName = null;
+		String keytab = flinkConfig.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+		if(keytab != null) {
+			File source = new File(keytab);
+			if(source.exists()) {
+				LOG.debug("Adding keytab file: [{}] to artifact server", source);
+				keytabFileName = source.getName();
+				cmd.addUris(uri(artifactServer.addFile(source, source.getName()), true));
+			}
+		}
+
+		String principal = flinkConfig.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+		if(keytabFileName != null && principal != null) {
+			//reset the configurations since we will use in-memory reference from within the TM instance
+			taskManagerConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,"");
+			taskManagerConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,"");
+		}
+
 		// register the TaskManager configuration
 		final File taskManagerConfigFile =
 			new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
@@ -630,7 +664,20 @@ public class MesosApplicationMasterRunner {
 			envBuilder.addVariables(variable(entry.getKey(), entry.getValue()));
 		}
 		envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString));
-		envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLIENT_USERNAME, clientUsername));
+
+		//add hadoop config directory to the environment
+		if(hadoopConf) {
+			envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_CONF_DIR, "."));
+		}
+
+		//add keytab and principal to environment
+		if(keytabFileName != null && principal != null) {
+			envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB, keytabFileName));
+			envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL, principal));
+		}
+
+		envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_USER_NAME,
+				UserGroupInformation.getCurrentUser().getUserName()));
 
 		cmd.setEnvironment(envBuilder);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index 9413c68..bc6dde4 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -30,7 +30,7 @@ public class MesosConfigKeys {
 	public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
 	public static final String ENV_SLOTS = "_SLOTS";
 	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+	public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
 	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
 	public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
 	public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
@@ -38,6 +38,9 @@ public class MesosConfigKeys {
 	public static final String ENV_CLASSPATH = "CLASSPATH";
 	public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX";
 	public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID";
+	public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
+	public static final String ENV_KEYTAB = "_KEYTAB_FILE";
+	public static final String ENV_KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
 
 	/** Private constructor to prevent instantiation */
 	private MesosConfigKeys() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index ddc2097..d7544a0 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,14 +18,16 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
+import java.io.File;
 import java.io.IOException;
-import java.security.PrivilegedAction;
 import java.util.Map;
 
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -33,8 +35,6 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,7 +73,7 @@ public class MesosTaskManagerRunner {
 
 		// read the environment variables
 		final Map<String, String> envs = System.getenv();
-		final String effectiveUsername = envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+		final String effectiveUsername = envs.get(MesosConfigKeys.ENV_HADOOP_USER_NAME);
 		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
 
 		// configure local directory
@@ -87,34 +87,55 @@ public class MesosTaskManagerRunner {
 			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
 		}
 
-		LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
-			UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
+		final String keytab = envs.get(MesosConfigKeys.ENV_KEYTAB);
+		LOG.info("Keytab file:{}", keytab);
 
-		// tell akka to die in case of an error
-		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+		final String principal = envs.get(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL);
+		LOG.info("Keytab principal:{}", principal);
 
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(effectiveUsername);
-		for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
+		if(keytab != null && keytab.length() != 0) {
+			File f = new File(".", keytab);
+			if(!f.exists()) {
+				LOG.error("Could not locate keytab file:[" + keytab + "]");
+				System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+			}
+			configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab);
+			configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, principal);
 		}
 
+		// tell akka to die in case of an error
+		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
 		// Infer the resource identifier from the environment variable
 		String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
 		final ResourceID resourceId = new ResourceID(containerID);
 		LOG.info("ResourceID assigned for this container: {}", resourceId);
 
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
+		String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
+		LOG.info("hadoopConfDir: {}", hadoopConfDir);
+
+		SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+		sc.setFlinkConfiguration(configuration);
+		if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
+			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+		}
+
+		try {
+			SecurityContext.install(sc);
+			LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
+					UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
+			SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Object>() {
+				@Override
+				public Object run() throws Exception {
 					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
+					return null;
 				}
-				catch (Throwable t) {
-					LOG.error("Error while starting the TaskManager", t);
-					System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-				}
-				return null;
-			}
-		});
+			});
+		}
+		catch (Throwable t) {
+			LOG.error("Error while starting the TaskManager", t);
+			System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+		}
+
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 02892d3..4b7c731 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.security;
 
 import java.util.concurrent.Callable;
+			LOG.info("Hadoop security is enabled");
 
 /**
  * A security context with may be required to run a Callable.


[2/2] flink git commit: [FLINK-4918] add SSL handler to artifact server

Posted by mx...@apache.org.
[FLINK-4918] add SSL handler to artifact server

This closes #2734.
This closes #2900.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe602eab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe602eab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe602eab

Branch: refs/heads/master
Commit: fe602eab2e2772cfe663baa7b4a3df8afe138da3
Parents: 673c724
Author: Vijay Srinivasaraghavan <vi...@emc.com>
Authored: Mon Oct 31 09:54:03 2016 -0700
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Nov 30 18:32:05 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  8 ++++
 .../MesosApplicationMasterRunner.java           | 14 +++----
 .../MesosTaskManagerRunner.java                 | 14 +++----
 .../flink/mesos/util/MesosArtifactServer.java   | 42 ++++++++++++++++++--
 .../flink/runtime/security/SecurityContext.java |  1 -
 5 files changed, 61 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index dd17f00..6bc5e2e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -498,6 +498,11 @@ public final class ConfigConstants {
 		"mesos.resourcemanager.tasks.container.image.name";
 
 	/**
+	 * Config parameter to override SSL support for the Artifact Server
+	 */
+	public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = "mesos.resourcemanager.artifactserver.ssl.enabled";
+
+	/**
 	 * The type of container to use for task managers. Valid values are
 	 * {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS} or
 	 * {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER}.
@@ -1181,6 +1186,9 @@ public final class ConfigConstants {
 
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
 
+	/** Default value to override SSL support for the Artifact Server */
+	public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
+
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE = "mesos";
 
 	// ------------------------ File System Behavior ------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index bca179f..ef58250 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.Hardware;
@@ -70,6 +70,7 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -145,16 +146,15 @@ public class MesosApplicationMasterRunner {
 
 			final Configuration configuration = createConfiguration(workingDir, dynamicProperties);
 
-			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
-			sc.setFlinkConfiguration(configuration);
+			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
 			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
-			SecurityContext.install(sc);
+			SecurityUtils.install(sc);
 
 			LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName());
 
-			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+			return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 				@Override
-				public Integer run() {
+				public Integer call() {
 					return runPrivileged(configuration);
 				}
 			});
@@ -279,7 +279,7 @@ public class MesosApplicationMasterRunner {
 			LOG.debug("Starting Artifact Server");
 			final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
 				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
-			artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort);
+			artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort, config);
 
 			// ----------------- (3) Generate the configuration for the TaskManagers -------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index d7544a0..5100deb 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -21,13 +21,14 @@ package org.apache.flink.mesos.runtime.clusterframework;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -114,19 +115,18 @@ public class MesosTaskManagerRunner {
 		String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
 		LOG.info("hadoopConfDir: {}", hadoopConfDir);
 
-		SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
-		sc.setFlinkConfiguration(configuration);
+		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
 		if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
 			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
 		}
 
 		try {
-			SecurityContext.install(sc);
+			SecurityUtils.install(sc);
 			LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
 					UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
-			SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Object>() {
+			SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
 				@Override
-				public Object run() throws Exception {
+				public Object call() throws Exception {
 					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
 					return null;
 				}
@@ -138,4 +138,4 @@ public class MesosTaskManagerRunner {
 		}
 
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 6547cb3..fbf61ac 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -43,13 +43,20 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.util.CharsetUtil;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.jets3t.service.utils.Mimetypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
@@ -87,11 +94,31 @@ public class MesosArtifactServer {
 
 	private URL baseURL;
 
-	public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception {
+	private final SSLContext serverSSLContext;
+
+	public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort, Configuration config)
+			throws Exception {
 		if (configuredPort < 0 || configuredPort > 0xFFFF) {
 			throw new IllegalArgumentException("File server port is invalid: " + configuredPort);
 		}
 
+		// Config to enable https access to the web-ui
+		boolean enableSSL = config.getBoolean(
+				ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
+				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
+				SSLUtils.getSSLEnabled(config);
+
+		if (enableSSL) {
+			LOG.info("Enabling ssl for the artifact server");
+			try {
+				serverSSLContext = SSLUtils.createSSLServerContext(config);
+			} catch (Exception e) {
+				throw new IOException("Failed to initialize SSLContext for the artifact server", e);
+			}
+		} else {
+			serverSSLContext = null;
+		}
+
 		router = new Router();
 
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -100,6 +127,13 @@ public class MesosArtifactServer {
 			protected void initChannel(SocketChannel ch) {
 				Handler handler = new Handler(router);
 
+				// SSL should be the first handler in the pipeline
+				if (serverSSLContext != null) {
+					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					sslEngine.setUseClientMode(false);
+					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+				}
+
 				ch.pipeline()
 					.addLast(new HttpServerCodec())
 					.addLast(handler.name(), handler)
@@ -123,9 +157,11 @@ public class MesosArtifactServer {
 		String address = bindAddress.getAddress().getHostAddress();
 		int port = bindAddress.getPort();
 
-		baseURL = new URL("http", serverHostname, port, "/" + sessionID + "/");
+		String httpProtocol = (serverSSLContext != null) ? "https": "http";
+
+		baseURL = new URL(httpProtocol, serverHostname, port, "/" + sessionID + "/");
 
-		LOG.info("Mesos artifact server listening at {}:{}", address, port);
+		LOG.info("Mesos Artifact Server Base URL: {}, listening at {}:{}", baseURL, address, port);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/fe602eab/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 4b7c731..02892d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -18,7 +18,6 @@
 package org.apache.flink.runtime.security;
 
 import java.util.concurrent.Callable;
-			LOG.info("Hadoop security is enabled");
 
 /**
  * A security context with may be required to run a Callable.