You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/12/05 23:29:10 UTC

[1/3] flink git commit: [FLINK-5091] Formalize the Mesos AppMaster environment for docker compatibility

Repository: flink
Updated Branches:
  refs/heads/master 3b85f42dc -> 8d7c3ff08


http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index d844f5d..2a33c44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -23,6 +23,8 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import com.typesafe.config.Config;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,6 +292,40 @@ public class BootstrapTools {
 		config.addAll(replacement);
 	}
 
+	private static final String DYNAMIC_PROPERTIES_OPT = "D";
+
+	/**
+	 * Get an instance of the dynamic properties option.
+	 *
+	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
+	 *  -Dfs.overwrite-files=true  -Dtaskmanager.network.numberOfBuffers=16368
+     */
+	public static Option newDynamicPropertiesOption() {
+		return new Option(DYNAMIC_PROPERTIES_OPT, true, "Dynamic properties");
+	}
+
+	/**
+	 * Parse the dynamic properties (passed on the command line).
+	 */
+	public static Configuration parseDynamicProperties(CommandLine cmd) {
+		final Configuration config = new Configuration();
+
+		String[] values = cmd.getOptionValues(DYNAMIC_PROPERTIES_OPT);
+		if(values != null) {
+			for(String value : values) {
+				String[] pair = value.split("=", 2);
+				if(pair.length == 1) {
+					config.setString(pair[0], Boolean.TRUE.toString());
+				}
+				else if(pair.length == 2) {
+					config.setString(pair[0], pair[1]);
+				}
+			}
+		}
+
+		return config;
+	}
+
 	/**
 	 * Generates the shell command to start a task manager.
 	 * @param flinkConfig The Flink configuration.

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
new file mode 100644
index 0000000..508a28c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates a container specification, including artifacts, environment variables,
+ * system properties, and Flink configuration settings.
+ *
+ * The specification is mutable.
+ *
+ * Note that the Flink configuration settings are considered dynamic overrides of whatever
+ * static configuration file is present in the container.  For example, a container might be
+ * based on a Docker image with a normal Flink installation with customized settings, which these
+ * settings would (partially) override.
+ *
+ * Artifacts are copied into a sandbox directory within the container, which any Flink process
+ * launched in the container is assumed to use as a working directory.  This assumption allows
+ * for relative paths to be used in certain environment variables.
+ */
+public class ContainerSpecification implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Configuration systemProperties;
+
+	private final List<Artifact> artifacts;
+
+	private final Map<String,String> environmentVariables;
+
+	private final Configuration dynamicConfiguration;
+
+	public ContainerSpecification() {
+		this.artifacts = new LinkedList<>();
+		this.environmentVariables = new HashMap<String,String>();
+		this.systemProperties = new Configuration();
+		this.dynamicConfiguration = new Configuration();
+	}
+
+	/**
+	 * Get the container artifacts.
+     */
+	public List<Artifact> getArtifacts() {
+		return artifacts;
+	}
+
+	/**
+	 * Get the environment variables.
+     */
+	public Map<String, String> getEnvironmentVariables() {
+		return environmentVariables;
+	}
+
+	/**
+	 * Get the dynamic configuration.
+     */
+	public Configuration getDynamicConfiguration() {
+		return dynamicConfiguration;
+	}
+
+	/**
+	 * Get the system properties.
+     */
+	public Configuration getSystemProperties() {
+		return systemProperties;
+	}
+
+	@Override
+	protected Object clone() throws CloneNotSupportedException {
+		ContainerSpecification clone = new ContainerSpecification();
+		clone.artifacts.addAll(this.artifacts);
+		clone.environmentVariables.putAll(this.environmentVariables);
+		clone.systemProperties.addAll(this.systemProperties);
+		clone.dynamicConfiguration.addAll(this.dynamicConfiguration);
+		return clone;
+	}
+
+	@Override
+	public String toString() {
+		return "ContainerSpecification{" +
+			"environmentVariables=" + environmentVariables +
+			", systemProperties=" + systemProperties +
+			", dynamicConfiguration=" + dynamicConfiguration +
+			", artifacts=" + artifacts +
+			'}';
+	}
+
+	/**
+	 * An artifact to be copied into the container.
+	 */
+	public static class Artifact {
+
+		public Artifact(Path source, Path dest, boolean executable, boolean cachable, boolean extract) {
+			checkArgument(source.isAbsolute(), "source must be absolute");
+			checkArgument(!dest.isAbsolute(), "destination must be relative");
+			this.source = source;
+			this.dest = dest;
+			this.executable = executable;
+			this.cachable = cachable;
+			this.extract = extract;
+		}
+
+		public final Path source;
+		public final Path dest;
+		public final boolean executable;
+		public final boolean cachable;
+		public final boolean extract;
+
+		@Override
+		public String toString() {
+			return "Artifact{" +
+				"source=" + source +
+				", dest=" + dest +
+				", executable=" + executable +
+				", cachable=" + cachable +
+				", extract=" + extract +
+				'}';
+		}
+
+		public static Builder newBuilder() { return new Builder(); }
+
+		public static class Builder {
+
+			public Path source;
+			public Path dest;
+			public boolean executable = false;
+			public boolean cachable = true;
+			public boolean extract = false;
+
+			public Builder setSource(Path source) {
+				this.source = source;
+				return this;
+			}
+
+			public Builder setDest(Path dest) {
+				this.dest = dest;
+				return this;
+			}
+
+			public Builder setCachable(boolean cachable) {
+				this.cachable = cachable;
+				return this;
+			}
+
+			public Builder setExtract(boolean extract) {
+				this.extract = extract;
+				return this;
+			}
+
+			public Builder setExecutable(boolean executable) {
+				this.executable = executable;
+				return this;
+			}
+
+			public Artifact build() {
+				return new Artifact(source, dest, executable, cachable, extract);
+			}
+		}
+	}
+
+	/**
+	 * Format the system properties as a shell-compatible command-line argument.
+     */
+	public static String formatSystemProperties(Configuration jvmArgs) {
+		StringBuilder sb = new StringBuilder();
+		for(Map.Entry<String,String> entry : jvmArgs.toMap().entrySet()) {
+			if(sb.length() > 0) {
+				sb.append(" ");
+			}
+			boolean quoted = entry.getValue().contains(" ");
+			if(quoted) {
+				sb.append("\"");
+			}
+			sb.append("-D").append(entry.getKey()).append('=').append(entry.getValue());
+			if(quoted) {
+				sb.append("\"");
+			}
+		}
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
new file mode 100644
index 0000000..007146a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+
+/**
+ * An abstract container overlay.
+ */
+abstract class AbstractContainerOverlay implements ContainerOverlay {
+
+	/**
+	 * Add a path recursively to the container specification.
+	 *
+	 * If the path is a directory, the directory itself (not just its contents) is added to the target path.
+	 *
+	 * The execute bit is preserved; permissions aren't.
+	 *
+	 * @param sourcePath the path to add.
+	 * @param targetPath the target path.
+	 * @param env the specification to mutate.
+     * @throws IOException
+     */
+	protected void addPathRecursively(
+		final File sourcePath, final Path targetPath, final ContainerSpecification env) throws IOException {
+
+		final java.nio.file.Path sourceRoot = sourcePath.toPath().getParent();
+
+		Files.walkFileTree(sourcePath.toPath(), new SimpleFileVisitor<java.nio.file.Path>() {
+			@Override
+			public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException {
+
+				java.nio.file.Path relativePath = sourceRoot.relativize(file);
+
+				ContainerSpecification.Artifact.Builder artifact = ContainerSpecification.Artifact.newBuilder()
+					.setSource(new Path(file.toUri()))
+					.setDest(new Path(targetPath, relativePath.toString()))
+					.setExecutable(Files.isExecutable(file))
+					.setCachable(true)
+					.setExtract(false);
+
+				env.getArtifacts().add(artifact.build());
+
+				return super.visitFile(file, attrs);
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
new file mode 100644
index 0000000..11e8f21
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A composite overlay that delegates to a set of inner overlays.
+ */
+public class CompositeContainerOverlay implements ContainerOverlay {
+
+	private final List<ContainerOverlay> overlays;
+
+	public CompositeContainerOverlay(ContainerOverlay... overlays) {
+		this(Arrays.asList(overlays));
+	}
+
+	public CompositeContainerOverlay(List<ContainerOverlay> overlays) {
+		this.overlays = Collections.unmodifiableList(overlays);
+	}
+
+	@Override
+	public void configure(ContainerSpecification containerConfig) throws IOException {
+		for(ContainerOverlay overlay : overlays) {
+			overlay.configure(containerConfig);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
new file mode 100644
index 0000000..62826e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
+import java.io.IOException;
+
+/**
+ * A container overlay to produce a container specification.
+ *
+ * An overlay applies configuration elements, environment variables,
+ * system properties, and artifacts to a container specification.
+ */
+public interface ContainerOverlay {
+
+	/**
+	 * Configure the given container specification.
+     */
+	void configure(ContainerSpecification containerSpecification) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
new file mode 100644
index 0000000..a36cc67
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_HOME_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Overlays Flink into a container, based on supplied bin/conf/lib directories.
+ *
+ * The overlayed Flink is indistinguishable from (and interchangeable with)
+ * a normal installation of Flink.  For a docker image-based container, it should be
+ * possible to bypass this overlay and rely on the normal installation method.
+ *
+ * The following files are copied to the container:
+ *  - flink/bin/
+ *  - flink/conf/
+ *  - flink/lib/
+ */
+public class FlinkDistributionOverlay extends AbstractContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkDistributionOverlay.class);
+
+	static final Path TARGET_ROOT = new Path("flink");
+
+	final File flinkBinPath;
+	final File flinkConfPath;
+	final File flinkLibPath;
+
+	public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, File flinkLibPath) {
+		this.flinkBinPath = checkNotNull(flinkBinPath);
+		this.flinkConfPath = checkNotNull(flinkConfPath);
+		this.flinkLibPath = checkNotNull(flinkLibPath);
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+
+		container.getEnvironmentVariables().put(ENV_FLINK_HOME_DIR, TARGET_ROOT.toString());
+
+		// add the paths to the container specification.
+		addPathRecursively(flinkBinPath, TARGET_ROOT, container);
+		addPathRecursively(flinkConfPath, TARGET_ROOT, container);
+		addPathRecursively(flinkLibPath, TARGET_ROOT, container);
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link FlinkDistributionOverlay}.
+	 */
+	public static class Builder {
+		File flinkBinPath;
+		File flinkConfPath;
+		File flinkLibPath;
+
+		/**
+		 * Configures the overlay using the current environment.
+		 *
+		 * Locates Flink using FLINK_???_DIR environment variables as provided to all Flink processes by config.sh.
+		 *
+		 * @param globalConfiguration the current configuration.
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration) {
+
+			Map<String,String> env = System.getenv();
+			if(env.containsKey(ENV_FLINK_BIN_DIR)) {
+				flinkBinPath = new File(System.getenv(ENV_FLINK_BIN_DIR));
+			}
+			else {
+				throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_BIN_DIR));
+			}
+
+			if(env.containsKey(ENV_FLINK_CONF_DIR)) {
+				flinkConfPath = new File(System.getenv(ENV_FLINK_CONF_DIR));
+			}
+			else {
+				throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_CONF_DIR));
+			}
+
+			if(env.containsKey(ENV_FLINK_LIB_DIR)) {
+				flinkLibPath = new File(System.getenv(ENV_FLINK_LIB_DIR));
+			}
+			else {
+				throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_LIB_DIR));
+			}
+
+			return this;
+		}
+
+		public FlinkDistributionOverlay build() {
+			return new FlinkDistributionOverlay(flinkBinPath, flinkConfPath, flinkLibPath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
new file mode 100644
index 0000000..bd79218
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Overlays a Hadoop configuration into a container, based on a supplied Hadoop
+ * configuration directory.
+ *
+ * The following files are copied to the container:
+ *  - hadoop/conf/core-site.xml
+ *  - hadoop/conf/hdfs-site.xml
+ *
+ * The following environment variables are set in the container:
+ *  - HADOOP_CONF_DIR
+ *
+ * The folloowing Flink configuration entries are updated:
+ *  - fs.hdfs.hadoopconf
+ */
+public class HadoopConfOverlay implements ContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopConfOverlay.class);
+
+	/**
+	 * The (relative) directory into which the Hadoop conf is copied.
+	 */
+	static final Path TARGET_CONF_DIR = new Path("hadoop/conf");
+
+	final File hadoopConfDir;
+
+	public HadoopConfOverlay(@Nullable File hadoopConfDir) {
+		this.hadoopConfDir = hadoopConfDir;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+
+		if(hadoopConfDir == null) {
+			return;
+		}
+
+		File coreSitePath = new File(hadoopConfDir, "core-site.xml");
+		File hdfsSitePath = new File(hadoopConfDir, "hdfs-site.xml");
+
+		container.getEnvironmentVariables().put("HADOOP_CONF_DIR", TARGET_CONF_DIR.toString());
+		container.getDynamicConfiguration().setString(ConfigConstants.PATH_HADOOP_CONFIG, TARGET_CONF_DIR.toString());
+
+		container.getArtifacts().add(ContainerSpecification.Artifact
+			.newBuilder()
+			.setSource(new Path(coreSitePath.toURI()))
+			.setDest(new Path(TARGET_CONF_DIR, coreSitePath.getName()))
+			.setCachable(true)
+			.build());
+
+		container.getArtifacts().add(ContainerSpecification.Artifact
+			.newBuilder()
+			.setSource(new Path(hdfsSitePath.toURI()))
+			.setDest(new Path(TARGET_CONF_DIR, hdfsSitePath.getName()))
+			.setCachable(true)
+			.build());
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link HadoopConfOverlay}.
+	 */
+	public static class Builder {
+
+		File hadoopConfDir;
+
+		/**
+		 * Configures the overlay using the current environment's Hadoop configuration.
+		 *
+		 * The following locations are checked for a Hadoop configuration:
+		 *  - (conf) fs.hdfs.hadoopconf
+		 *  - (env)  HADOOP_CONF_DIR
+		 *  - (env)  HADOOP_HOME/conf
+		 *  - (env)  HADOOP_HOME/etc/hadoop
+		 *
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration) {
+
+			String[] possibleHadoopConfPaths = new String[4];
+			possibleHadoopConfPaths[0] = globalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+			possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
+
+			if (System.getenv("HADOOP_HOME") != null) {
+				possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf";
+				possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
+			}
+
+			for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+				if (possibleHadoopConfPath != null) {
+					File confPath = new File(possibleHadoopConfPath);
+
+					File coreSitePath = new File(confPath, "core-site.xml");
+					File hdfsSitePath = new File(confPath, "hdfs-site.xml");
+
+					if (coreSitePath.exists() && hdfsSitePath.exists()) {
+						this.hadoopConfDir = confPath;
+						break;
+					}
+				}
+			}
+
+			if(hadoopConfDir == null) {
+				LOG.warn("Unable to locate a Hadoop configuration; HDFS will use defaults.");
+			}
+
+			return this;
+		}
+
+		public HadoopConfOverlay build() {
+			return new HadoopConfOverlay(hadoopConfDir);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
new file mode 100644
index 0000000..7081aea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * Overlays a Hadoop user context into a container.
+ *
+ * The overlay essentially configures Hadoop's {@link UserGroupInformation} class,
+ * establishing the effective username for filesystem calls to HDFS in non-secure clusters.
+ *
+ * In secure clusters, the configured keytab establishes the effective user.
+ *
+ * The following environment variables are set in the container:
+ *  - HADOOP_USER_NAME
+ */
+public class HadoopUserOverlay implements ContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopUserOverlay.class);
+
+	private final UserGroupInformation ugi;
+
+	public HadoopUserOverlay(@Nullable UserGroupInformation ugi) {
+		this.ugi = ugi;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+		if(ugi != null) {
+			// overlay the Hadoop user identity (w/ tokens)
+			container.getEnvironmentVariables().put("HADOOP_USER_NAME", ugi.getUserName());
+		}
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link HadoopUserOverlay}.
+	 */
+	public static class Builder {
+
+		UserGroupInformation ugi;
+
+		/**
+		 * Configures the overlay using the current Hadoop user information (from {@link UserGroupInformation}).
+         */
+		public Builder fromEnvironment(Configuration globalConfiguration) throws IOException {
+			ugi = UserGroupInformation.getCurrentUser();
+			return this;
+		}
+
+		public HadoopUserOverlay build() {
+			return new HadoopUserOverlay(ugi);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
new file mode 100644
index 0000000..7fe5b3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container.
+ *
+ * The folloowing Flink configuration entries are updated:
+ *  - security.keytab
+ */
+public class KeytabOverlay extends AbstractContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KeytabOverlay.class);
+
+	static final Path TARGET_PATH = new Path("krb5.keytab");
+
+	final Path keytab;
+
+	public KeytabOverlay(@Nullable File keytab) {
+		this.keytab = keytab != null ? new Path(keytab.toURI()) : null;
+	}
+
+	public KeytabOverlay(@Nullable Path keytab) {
+		this.keytab = keytab;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+		if(keytab != null) {
+			container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+				.setSource(keytab)
+				.setDest(TARGET_PATH)
+				.setCachable(false)
+				.build());
+			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath());
+		}
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link HadoopUserOverlay}.
+	 */
+	public static class Builder {
+
+		File keytabPath;
+
+		/**
+		 * Configures the overlay using the current environment (and global configuration).
+		 *
+		 * The following Flink configuration settings are checked for a keytab:
+		 *  - security.keytab
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration) {
+			String keytab = globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+			if(keytab != null) {
+				keytabPath = new File(keytab);
+				if(!keytabPath.exists()) {
+					throw new IllegalStateException("Invalid configuration for " +
+						ConfigConstants.SECURITY_KEYTAB_KEY +
+						"; '" + keytab + "' not found.");
+				}
+			}
+
+			return this;
+		}
+
+		public KeytabOverlay build() {
+			return new KeytabOverlay(keytabPath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
new file mode 100644
index 0000000..fb161b9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Overlays a Kerberos configuration file into a container.
+ *
+ * The following files are copied to the container:
+ *  - krb5.conf
+ *
+ * The following Java system properties are set in the container:
+ *  - java.security.krb5.conf
+ */
+public class Krb5ConfOverlay extends AbstractContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Krb5ConfOverlay.class);
+
+	static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
+
+	static final Path TARGET_PATH = new Path("krb5.conf");
+	final Path krb5Conf;
+
+	public Krb5ConfOverlay(@Nullable File krb5Conf) {
+		this.krb5Conf = krb5Conf != null ? new Path(krb5Conf.toURI()) : null;
+	}
+
+	public Krb5ConfOverlay(@Nullable Path krb5Conf) {
+		this.krb5Conf = krb5Conf;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+		if(krb5Conf != null) {
+			container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+				.setSource(krb5Conf)
+				.setDest(TARGET_PATH)
+				.setCachable(true)
+				.build());
+			container.getSystemProperties().setString(JAVA_SECURITY_KRB5_CONF, TARGET_PATH.getPath());
+		}
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link Krb5ConfOverlay}.
+	 */
+	public static class Builder {
+
+		File krb5ConfPath;
+
+		/**
+		 * Configures the overlay using the current environment.
+		 *
+		 * Locates the krb5.conf configuration file as per
+		 * <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">Java documentation</a>.
+		 * Note that the JRE doesn't support the KRB5_CONFIG environment variable (JDK-7045913).
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration) {
+
+			// check the system property
+			String krb5Config = System.getProperty(JAVA_SECURITY_KRB5_CONF);
+			if(krb5Config != null && krb5Config.length() != 0) {
+				krb5ConfPath = new File(krb5Config);
+				if(!krb5ConfPath.exists()) {
+					throw new IllegalStateException("java.security.krb5.conf refers to a non-existent file");
+				}
+			}
+
+			// FUTURE: check the well-known paths
+			// - $JAVA_HOME/lib/security
+			// - %WINDIR%\krb5.ini (Windows)
+			// - /etc/krb5.conf (Linux)
+
+			return this;
+		}
+
+		public Krb5ConfOverlay build() {
+			return new Krb5ConfOverlay(krb5ConfPath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
new file mode 100644
index 0000000..dd79ca1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Overlays an SSL keystore/truststore into a container.
+ *
+ * The following files are placed into the container:
+ *  - keystore.jks
+ *  - truststore.jks
+ *
+ * The following Flink configuration entries are set:
+ *  - security.ssl.keystore
+ *  - security.ssl.truststore
+ */
+public class SSLStoreOverlay extends AbstractContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SSLStoreOverlay.class);
+
+	static final Path TARGET_KEYSTORE_PATH = new Path("keystore.jks");
+	static final Path TARGET_TRUSTSTORE_PATH = new Path("truststore.jks");
+
+	final Path keystore;
+	final Path truststore;
+
+	public SSLStoreOverlay(@Nullable File keystoreFile, @Nullable File truststoreFile) {
+		this.keystore = keystoreFile != null ? new Path(keystoreFile.toURI()) : null;
+		this.truststore = truststoreFile != null ? new Path(truststoreFile.toURI()) : null;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+		if(keystore != null) {
+			container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+				.setSource(keystore)
+				.setDest(TARGET_KEYSTORE_PATH)
+				.setCachable(false)
+				.build());
+			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath());
+		}
+		if(truststore != null) {
+			container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+				.setSource(truststore)
+				.setDest(TARGET_TRUSTSTORE_PATH)
+				.setCachable(false)
+				.build());
+			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath());
+		}
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link Krb5ConfOverlay}.
+	 */
+	public static class Builder {
+
+		File keystorePath;
+
+		File truststorePath;
+
+		/**
+		 * Configures the overlay using the current environment (and global configuration).
+		 *
+		 * The following Flink configuration settings are used to source the keystore and truststore:
+		 *  - security.ssl.keystore
+		 *  - security.ssl.truststore
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration)  {
+
+			String keystore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null);
+			if(keystore != null) {
+				keystorePath = new File(keystore);
+				if(!keystorePath.exists()) {
+					throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_KEYSTORE);
+				}
+			}
+
+			String truststore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null);
+			if(truststore != null) {
+				truststorePath = new File(truststore);
+				if(!truststorePath.exists()) {
+					throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_TRUSTSTORE);
+				}
+			}
+
+			return this;
+		}
+
+		public SSLStoreOverlay build() {
+			return new SSLStoreOverlay(keystorePath, truststorePath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index f6e0a8c..7416cc6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -53,7 +53,7 @@ public class SecurityUtils {
 
 	public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
 
-	private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+	public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
 
 	private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
 
@@ -130,6 +130,8 @@ public class SecurityUtils {
 				loginUser = UserGroupInformation.getLoginUser();
 			}
 
+			LOG.info("Hadoop user set to {}", loginUser.toString());
+
 			boolean delegationToken = false;
 			final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
 			Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
new file mode 100644
index 0000000..bbea376
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ContainerOverlayTestBase {
+
+	private Map<String, String> originalEnvironment;
+
+	@Before
+	public void before() {
+		originalEnvironment = new HashMap<>(System.getenv());
+	}
+
+	@After
+	public void after() {
+		CommonTestUtils.setEnv(originalEnvironment, true);
+	}
+
+
+	/**
+	 * Create an empty file for each given path.
+	 * @param root the root folder in which to create the files.
+	 * @param paths the relative paths to create.
+     */
+	protected static Path[] createPaths(File root, String... paths) throws Exception {
+		Path[] files = new Path[paths.length];
+		for(int i = 0; i < paths.length; i++) {
+			File file = root.toPath().resolve(paths[i]).toFile();
+			file.getParentFile().mkdirs();
+			file.createNewFile();
+			files[i] = new Path(paths[i]);
+		}
+		return files;
+	}
+
+	/**
+	 * Check that an artifact exists for the given remote path.
+     */
+	protected static ContainerSpecification.Artifact checkArtifact(ContainerSpecification spec, Path remotePath) {
+		for(ContainerSpecification.Artifact artifact : spec.getArtifacts()) {
+			if(remotePath.equals(artifact.dest)) {
+				return artifact;
+			}
+		}
+		throw new AssertionError("no such artifact (" + remotePath + ")");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
new file mode 100644
index 0000000..e77dd3a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+
+import static org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay.TARGET_ROOT;
+
+public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File binFolder = tempFolder.newFolder("bin");
+		File libFolder = tempFolder.newFolder("lib");
+		File confFolder = tempFolder.newFolder("conf");
+
+		Path[] files = createPaths(
+			tempFolder.getRoot(),
+			"bin/config.sh",
+			"bin/taskmanager.sh",
+			"lib/foo.jar",
+			"lib/A/foo.jar",
+			"lib/B/foo.jar",
+			"lib/B/bar.jar");
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		FlinkDistributionOverlay overlay = new FlinkDistributionOverlay(
+			binFolder,
+			confFolder,
+			libFolder
+		);
+		overlay.configure(containerSpecification);
+
+		for(Path file : files) {
+			checkArtifact(containerSpecification, new Path(TARGET_ROOT, file.toString()));
+		}
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+		Configuration conf = new Configuration();
+
+		File binFolder = tempFolder.newFolder("bin");
+		File libFolder = tempFolder.newFolder("lib");
+		File confFolder = tempFolder.newFolder("conf");
+
+		// adjust the test environment for the purposes of this test
+		Map<String, String> map = new HashMap<String, String>(System.getenv());
+		map.put(ENV_FLINK_BIN_DIR, binFolder.getAbsolutePath());
+		map.put(ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
+		map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
+ 		CommonTestUtils.setEnv(map);
+
+		FlinkDistributionOverlay.Builder builder = FlinkDistributionOverlay.newBuilder().fromEnvironment(conf);
+
+		assertEquals(binFolder.getAbsolutePath(), builder.flinkBinPath.getAbsolutePath());
+		assertEquals(libFolder.getAbsolutePath(), builder.flinkLibPath.getAbsolutePath());
+		assertEquals(confFolder.getAbsolutePath(), builder.flinkConfPath.getAbsolutePath());
+	}
+
+	@Test
+	public void testBuilderFromEnvironmentBad() throws Exception {
+		Configuration conf = new Configuration();
+
+		// adjust the test environment for the purposes of this test
+		Map<String, String> map = new HashMap<>(System.getenv());
+		map.remove(ENV_FLINK_BIN_DIR);
+		map.remove(ENV_FLINK_LIB_DIR);
+		map.remove(ENV_FLINK_CONF_DIR);
+		CommonTestUtils.setEnv(map);
+
+		try {
+			FlinkDistributionOverlay.Builder builder = FlinkDistributionOverlay.newBuilder().fromEnvironment(conf);
+			fail();
+		}
+		catch(IllegalStateException e) {
+			// expected
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
new file mode 100644
index 0000000..c3ea41b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay.TARGET_CONF_DIR;
+
+public class HadoopConfOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File confDir = tempFolder.newFolder();
+		initConfDir(confDir);
+
+		HadoopConfOverlay overlay = new HadoopConfOverlay(confDir);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR"));
+		assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null));
+
+		checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml"));
+		checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml"));
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		HadoopConfOverlay overlay = new HadoopConfOverlay(null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+
+		// verify that the builder picks up various environment locations
+		HadoopConfOverlay.Builder builder;
+		Map<String, String> env;
+
+		// fs.hdfs.hadoopconf
+		File confDir = tempFolder.newFolder();
+		initConfDir(confDir);
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.PATH_HADOOP_CONFIG, confDir.getAbsolutePath());
+		builder = HadoopConfOverlay.newBuilder().fromEnvironment(conf);
+		assertEquals(confDir, builder.hadoopConfDir);
+
+		// HADOOP_CONF_DIR
+		env = new HashMap<String, String>(System.getenv());
+		env.remove("HADOOP_HOME");
+		env.put("HADOOP_CONF_DIR", confDir.getAbsolutePath());
+		CommonTestUtils.setEnv(env);
+		builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration());
+		assertEquals(confDir, builder.hadoopConfDir);
+
+		// HADOOP_HOME/conf
+		File homeDir = tempFolder.newFolder();
+		confDir = initConfDir(new File(homeDir, "conf"));
+		env = new HashMap<String, String>(System.getenv());
+		env.remove("HADOOP_CONF_DIR");
+		env.put("HADOOP_HOME", homeDir.getAbsolutePath());
+		CommonTestUtils.setEnv(env);
+		builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration());
+		assertEquals(confDir, builder.hadoopConfDir);
+
+		// HADOOP_HOME/etc/hadoop
+		homeDir = tempFolder.newFolder();
+		confDir = initConfDir(new File(homeDir, "etc/hadoop"));
+		env = new HashMap<String, String>(System.getenv());
+		env.remove("HADOOP_CONF_DIR");
+		env.put("HADOOP_HOME", homeDir.getAbsolutePath());
+		CommonTestUtils.setEnv(env);
+		builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration());
+		assertEquals(confDir, builder.hadoopConfDir);
+	}
+
+	private File initConfDir(File confDir) throws Exception {
+		confDir.mkdirs();
+		new File(confDir, "core-site.xml").createNewFile();
+		new File(confDir, "hdfs-site.xml").createNewFile();
+		return confDir;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
new file mode 100644
index 0000000..7a463b8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.security.PrivilegedAction;
+
+import static org.junit.Assert.assertEquals;
+
+public class HadoopUserOverlayTest extends ContainerOverlayTestBase {
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test");
+
+		HadoopUserOverlay overlay = new HadoopUserOverlay(ugi);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(ugi.getUserName(), spec.getEnvironmentVariables().get("HADOOP_USER_NAME"));
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		HadoopUserOverlay overlay = new HadoopUserOverlay(null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+
+		final Configuration conf = new Configuration();
+		final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test");
+
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				try {
+					HadoopUserOverlay.Builder builder = HadoopUserOverlay.newBuilder().fromEnvironment(conf);
+					assertEquals(ugi, builder.ugi);
+					return null;
+				}
+				catch(Exception ex) {
+					throw new AssertionError(ex);
+				}
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
new file mode 100644
index 0000000..0570f28
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay.TARGET_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class KeytabOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File keytab = tempFolder.newFile();
+
+		KeytabOverlay overlay = new KeytabOverlay(keytab);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, null));
+		checkArtifact(spec, TARGET_PATH);
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		KeytabOverlay overlay = new KeytabOverlay((Path) null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+
+		final Configuration conf = new Configuration();
+		File keytab = tempFolder.newFile();
+
+		conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab.getAbsolutePath());
+		KeytabOverlay.Builder builder = KeytabOverlay.newBuilder().fromEnvironment(conf);
+		assertEquals(builder.keytabPath, keytab);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
new file mode 100644
index 0000000..1f86b89
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.JAVA_SECURITY_KRB5_CONF;
+import static org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.TARGET_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class Krb5ConfOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File krb5conf = tempFolder.newFile();
+
+		Krb5ConfOverlay overlay = new Krb5ConfOverlay(krb5conf);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(TARGET_PATH.getPath(), spec.getSystemProperties().getString(JAVA_SECURITY_KRB5_CONF, null));
+		checkArtifact(spec, TARGET_PATH);
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		Krb5ConfOverlay overlay = new Krb5ConfOverlay((Path) null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
new file mode 100644
index 0000000..0894ce6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_KEYSTORE_PATH;
+import static org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_TRUSTSTORE_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class SSLStoreOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File keystore = tempFolder.newFile();
+		File truststore = tempFolder.newFile();
+		SSLStoreOverlay overlay = new SSLStoreOverlay(keystore, truststore);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null));
+		checkArtifact(spec, TARGET_KEYSTORE_PATH);
+
+		assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null));
+		checkArtifact(spec, TARGET_TRUSTSTORE_PATH);
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		SSLStoreOverlay overlay = new SSLStoreOverlay(null, null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+
+		final Configuration conf = new Configuration();
+		File keystore = tempFolder.newFile();
+		File truststore = tempFolder.newFile();
+
+		conf.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, keystore.getAbsolutePath());
+		conf.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, truststore.getAbsolutePath());
+
+		SSLStoreOverlay.Builder builder = SSLStoreOverlay.newBuilder().fromEnvironment(conf);
+		assertEquals(builder.keystorePath, keystore);
+		assertEquals(builder.truststorePath, truststore);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index d318a3c..45c5a77 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -29,6 +29,9 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.Map;
 
 import static org.junit.Assert.fail;
 
@@ -114,4 +117,40 @@ public class CommonTestUtils {
 			fail("Cannot determine Java version: " + e.getMessage());
 		}
 	}
+
+	// This code is taken from: http://stackoverflow.com/a/7201825/568695
+	// it changes the environment variables of this JVM. Use only for testing purposes!
+	@SuppressWarnings("unchecked")
+	public static void setEnv(Map<String, String> newenv) {
+		try {
+			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
+			Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
+			theEnvironmentField.setAccessible(true);
+			Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
+			env.putAll(newenv);
+			Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+			theCaseInsensitiveEnvironmentField.setAccessible(true);
+			Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
+			cienv.putAll(newenv);
+		} catch (NoSuchFieldException e) {
+			try {
+				Class<?>[] classes = Collections.class.getDeclaredClasses();
+				Map<String, String> env = System.getenv();
+				for (Class<?> cl : classes) {
+					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+						Field field = cl.getDeclaredField("m");
+						field.setAccessible(true);
+						Object obj = field.get(env);
+						Map<String, String> map = (Map<String, String>) obj;
+						map.clear();
+						map.putAll(newenv);
+					}
+				}
+			} catch (Exception e2) {
+				throw new RuntimeException(e2);
+			}
+		} catch (Exception e1) {
+			throw new RuntimeException(e1);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index aa5e7d3..804b3d4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
@@ -58,7 +59,6 @@ import java.io.FileReader;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.Field;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -542,42 +542,10 @@ public class TestBaseUtils extends TestLogger {
 		return configs;
 	}
 
-	// This code is taken from: http://stackoverflow.com/a/7201825/568695
-	// it changes the environment variables of this JVM. Use only for testing purposes!
-	@SuppressWarnings("unchecked")
 	public static void setEnv(Map<String, String> newenv) {
-		try {
-			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
-			Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
-			theEnvironmentField.setAccessible(true);
-			Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
-			env.putAll(newenv);
-			Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
-			theCaseInsensitiveEnvironmentField.setAccessible(true);
-			Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
-			cienv.putAll(newenv);
-		} catch (NoSuchFieldException e) {
-			try {
-				Class<?>[] classes = Collections.class.getDeclaredClasses();
-				Map<String, String> env = System.getenv();
-				for (Class<?> cl : classes) {
-					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
-						Field field = cl.getDeclaredField("m");
-						field.setAccessible(true);
-						Object obj = field.get(env);
-						Map<String, String> map = (Map<String, String>) obj;
-						map.clear();
-						map.putAll(newenv);
-					}
-				}
-			} catch (Exception e2) {
-				throw new RuntimeException(e2);
-			}
-		} catch (Exception e1) {
-			throw new RuntimeException(e1);
-		}
+		CommonTestUtils.setEnv(newenv);
 	}
-	
+
 	private static ExecutionContext defaultExecutionContext() {
 		return ExecutionContext$.MODULE$.global();
 	}


[2/3] flink git commit: [FLINK-5091] Formalize the Mesos AppMaster environment for docker compatibility

Posted by mx...@apache.org.
[FLINK-5091] Formalize the Mesos AppMaster environment for docker compatibility

- introduced ContainerSpecification.
- reworked how the TM container environment is constructed; eliminated
- special-case environment variables, file layout.
- added dynamic configuration support to GlobalConfiguration.
- integrated the SecurityContext into AM/TM runners.
- added config setting for Mesos framework user.
- support DCOS side-channel authentication.
- set the FS default scheme.
- made the artifact server more generic (no assumption about existence
- of dispatcher, Path-based).
- moved some test code related to overriding the JVM\u2019s env.
- moved the Mesos containerizer config code to the MesosTaskManagerParameters.

This closes #2915.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/230bf17b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/230bf17b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/230bf17b

Branch: refs/heads/master
Commit: 230bf17bac3d76959a5cb6aa73ac685757c51cab
Parents: 3b85f42
Author: wrighe3 <er...@emc.com>
Authored: Thu Dec 1 00:21:28 2016 -0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Dec 6 00:29:25 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  15 +
 .../configuration/GlobalConfiguration.java      |  27 +-
 flink-dist/src/main/assemblies/bin.xml          |   6 +
 .../main/flink-bin/mesos-bin/mesos-appmaster.sh |  51 +++
 .../flink-bin/mesos-bin/mesos-taskmanager.sh    |  60 +++
 .../main/java/org/apache/flink/mesos/Utils.java |  21 +
 .../clusterframework/LaunchableMesosWorker.java | 106 ++++-
 .../MesosApplicationMasterRunner.java           | 456 ++++++-------------
 .../clusterframework/MesosConfigKeys.java       |  25 +-
 .../MesosFlinkResourceManager.java              |  45 +-
 .../MesosTaskManagerParameters.java             | 106 ++++-
 .../MesosTaskManagerRunner.java                 |  73 ++-
 .../flink/mesos/util/MesosArtifactResolver.java |  31 ++
 .../flink/mesos/util/MesosArtifactServer.java   | 146 ++++--
 .../MesosFlinkResourceManagerTest.java          |  19 +-
 .../clusterframework/BootstrapTools.java        |  36 ++
 .../ContainerSpecification.java                 | 206 +++++++++
 .../overlays/AbstractContainerOverlay.java      |  72 +++
 .../overlays/CompositeContainerOverlay.java     |  49 ++
 .../overlays/ContainerOverlay.java              |  37 ++
 .../overlays/FlinkDistributionOverlay.java      | 126 +++++
 .../overlays/HadoopConfOverlay.java             | 147 ++++++
 .../overlays/HadoopUserOverlay.java             |  83 ++++
 .../overlays/KeytabOverlay.java                 | 102 +++++
 .../overlays/Krb5ConfOverlay.java               | 111 +++++
 .../overlays/SSLStoreOverlay.java               | 124 +++++
 .../flink/runtime/security/SecurityUtils.java   |   4 +-
 .../overlays/ContainerOverlayTestBase.java      |  73 +++
 .../overlays/FlinkDistributionOverlayTest.java  | 117 +++++
 .../overlays/HadoopConfOverlayTest.java         | 119 +++++
 .../overlays/HadoopUserOverlayTest.java         |  73 +++
 .../overlays/KeytabOverlayTest.java             |  71 +++
 .../overlays/Krb5ConfOverlayTest.java           |  59 +++
 .../overlays/SSLStoreOverlayTest.java           |  78 ++++
 .../flink/core/testutils/CommonTestUtils.java   |  39 ++
 .../apache/flink/test/util/TestBaseUtils.java   |  38 +-
 36 files changed, 2450 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 6bc5e2e..a515c33 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -440,6 +440,11 @@ public final class ConfigConstants {
 	// ------------------------ Mesos Configuration ------------------------
 
 	/**
+	 * The initial number of Mesos tasks to allocate.
+	 */
+	public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks";
+
+	/**
 	 * The maximum number of failed Mesos tasks before entirely stopping
 	 * the Mesos session / job on Mesos.
 	 *
@@ -484,6 +489,8 @@ public final class ConfigConstants {
 
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret";
 
+	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user";
+
 	/**
 	 * The cpus to acquire from Mesos.
 	 *
@@ -1186,6 +1193,8 @@ public final class ConfigConstants {
 
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
 
+	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
+
 	/** Default value to override SSL support for the Artifact Server */
 	public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
 
@@ -1405,6 +1414,12 @@ public final class ConfigConstants {
 	/** The environment variable name which contains the location of the lib folder */
 	public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
+	/** The environment variable name which contains the location of the bin directory */
+	public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR";
+
+	/** The environment variable name which contains the Flink installation root directory */
+	public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
+
 	// -------------------------------- Security -------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index ecfbc72..dca6307 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -39,12 +39,31 @@ public final class GlobalConfiguration {
 
 	public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
 
+
 	// --------------------------------------------------------------------------------------------
 
 	private GlobalConfiguration() {}
 
 	// --------------------------------------------------------------------------------------------
 
+	private static Configuration dynamicProperties = null;
+
+	/**
+	 * Set the process-wide dynamic properties to be merged with the loaded configuration.
+     */
+	public static void setDynamicProperties(Configuration dynamicProperties) {
+		GlobalConfiguration.dynamicProperties = new Configuration(dynamicProperties);
+	}
+
+	/**
+	 * Get the dynamic properties.
+     */
+	public static Configuration getDynamicProperties() {
+		return GlobalConfiguration.dynamicProperties;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
 	/**
 	 * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an
 	 * empty configuration object if the environment variable is not set. In production this variable is set but
@@ -90,7 +109,13 @@ public final class GlobalConfiguration {
 					"' (" + confDirFile.getAbsolutePath() + ") does not exist.");
 		}
 
-		return loadYAMLResource(yamlConfigFile);
+		Configuration conf = loadYAMLResource(yamlConfigFile);
+
+		if(dynamicProperties != null) {
+			conf.addAll(dynamicProperties);
+		}
+
+		return conf;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index b4291d3..901cac9 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -82,6 +82,12 @@ under the License.
 			<outputDirectory>bin</outputDirectory>
 			<fileMode>0755</fileMode>
 		</fileSet>
+		<!-- copy Mesos start scripts -->
+		<fileSet>
+			<directory>src/main/flink-bin/mesos-bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
 		
 		<!-- copy default configuration -->
 		<fileSet>

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
new file mode 100755
index 0000000..d65c6b0
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+# auxilliary function to construct a lightweight classpath for the
+# Flink AppMaster
+constructAppMasterClassPath() {
+
+    while read -d '' -r jarfile ; do
+        if [[ $CC_CLASSPATH = "" ]]; then
+            CC_CLASSPATH="$jarfile";
+        else
+            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
+        fi
+    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+
+    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
+}
+
+CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)`
+
+log=flink-appmaster.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner "$@"
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
new file mode 100755
index 0000000..ff03abd
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
@@ -0,0 +1,60 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+# auxilliary function to construct a lightweight classpath for the
+# Flink TaskManager
+constructTaskManagerClassPath() {
+
+    while read -d '' -r jarfile ; do
+        if [[ $CC_CLASSPATH = "" ]]; then
+            CC_CLASSPATH="$jarfile";
+        else
+            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
+        fi
+    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+
+    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
+}
+
+CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)`
+
+log=flink-taskmanager.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+# Add precomputed memory JVM options
+if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then
+    FLINK_ENV_JAVA_OPTS_MEM=""
+fi
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}"
+
+# Add TaskManager-specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager "$@"
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
index 173ae33..7787e40 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.mesos;
 
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.mesos.Protos;
+import scala.Option;
 
 import java.net.URL;
 import java.util.Arrays;
@@ -46,6 +49,24 @@ public class Utils {
 	}
 
 	/**
+	 * Construct a Mesos URI.
+	 */
+	public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
+		Option<URL> url = resolver.resolve(artifact.dest);
+		if(url.isEmpty()) {
+			throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
+		}
+
+		return Protos.CommandInfo.URI.newBuilder()
+			.setValue(url.get().toExternalForm())
+			.setOutputFile(artifact.dest.toString())
+			.setExtract(artifact.extract)
+			.setCache(artifact.cachable)
+			.setExecutable(artifact.executable)
+			.build();
+	}
+
+	/**
 	 * Construct a scalar resource value.
 	 */
 	public static Protos.Resource scalar(String name, double value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 5f940b5..c6e51f1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -23,8 +23,11 @@ import com.netflix.fenzo.TaskAssignmentResult;
 import com.netflix.fenzo.TaskRequest;
 import com.netflix.fenzo.VMTaskFitnessCalculator;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.Utils;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.mesos.Protos;
 
 import java.util.Collections;
@@ -38,7 +41,10 @@ import static org.apache.flink.mesos.Utils.ranges;
 import static org.apache.flink.mesos.Utils.scalar;
 
 /**
- * Specifies how to launch a Mesos worker.
+ * Implements the launch of a Mesos worker.
+ *
+ * Translates the abstract {@link ContainerSpecification} into a concrete
+ * Mesos-specific {@link org.apache.mesos.Protos.TaskInfo}.
  */
 public class LaunchableMesosWorker implements LaunchableTask {
 
@@ -49,20 +55,24 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		"taskmanager.rpc.port",
 		"taskmanager.data.port" };
 
+	private final MesosArtifactResolver resolver;
+	private final ContainerSpecification containerSpec;
 	private final MesosTaskManagerParameters params;
-	private final Protos.TaskInfo.Builder template;
 	private final Protos.TaskID taskID;
 	private final Request taskRequest;
 
 	/**
 	 * Construct a launchable Mesos worker.
 	 * @param params the TM parameters such as memory, cpu to acquire.
-	 * @param template a template for the TaskInfo to be constructed at launch time.
+	 * @param containerSpec an abstract container specification for launch time.
 	 * @param taskID the taskID for this worker.
 	 */
-	public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+	public LaunchableMesosWorker(
+		MesosArtifactResolver resolver, MesosTaskManagerParameters params,
+		ContainerSpecification containerSpec, Protos.TaskID taskID) {
+		this.resolver = resolver;
 		this.params = params;
-		this.template = template;
+		this.containerSpec = containerSpec;
 		this.taskID = taskID;
 		this.taskRequest = new Request();
 	}
@@ -157,17 +167,25 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	@Override
 	public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assignment) {
 
+		ContaineredTaskManagerParameters tmParams = params.containeredParameters();
+
 		final Configuration dynamicProperties = new Configuration();
 
-		// specialize the TaskInfo template with assigned resources, environment variables, etc
-		final Protos.TaskInfo.Builder taskInfo = template
-			.clone()
+		// incorporate the dynamic properties set by the template
+		dynamicProperties.addAll(containerSpec.getDynamicConfiguration());
+
+		// build a TaskInfo with assigned resources, environment variables, etc
+		final Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder()
 			.setSlaveId(slaveId)
 			.setTaskId(taskID)
 			.setName(taskID.getValue())
 			.addResources(scalar("cpus", assignment.getRequest().getCPUs()))
 			.addResources(scalar("mem", assignment.getRequest().getMemory()));
 
+		final Protos.CommandInfo.Builder cmd = taskInfo.getCommandBuilder();
+		final Protos.Environment.Builder env = cmd.getEnvironmentBuilder();
+		final StringBuilder jvmArgs = new StringBuilder();
+
 		// use the assigned ports for the TM
 		if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) {
 			throw new IllegalArgumentException("unsufficient # of ports assigned");
@@ -179,17 +197,69 @@ public class LaunchableMesosWorker implements LaunchableTask {
 			dynamicProperties.setInteger(key, port);
 		}
 
-		// finalize environment variables
-		final Protos.Environment.Builder environmentBuilder = taskInfo.getCommandBuilder().getEnvironmentBuilder();
+		// ship additional files
+		for(ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
+			cmd.addUris(Utils.uri(resolver, artifact));
+		}
 
-		// propagate the Mesos task ID to the TM
-		environmentBuilder
-			.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
+		// propagate environment variables
+		for (Map.Entry<String, String> entry : params.containeredParameters().taskManagerEnv().entrySet()) {
+			env.addVariables(variable(entry.getKey(), entry.getValue()));
+		}
+		for (Map.Entry<String, String> entry : containerSpec.getEnvironmentVariables().entrySet()) {
+			env.addVariables(variable(entry.getKey(), entry.getValue()));
+		}
 
-		// propagate the dynamic configuration properties to the TM
-		String dynamicPropertiesEncoded = FlinkMesosSessionCli.encodeDynamicProperties(dynamicProperties);
-		environmentBuilder
-			.addVariables(variable(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded));
+		// propagate the Mesos task ID to the TM
+		env.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
+
+		// finalize the memory parameters
+		jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
+		jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
+		jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+
+		// pass dynamic system properties
+		jvmArgs.append(' ').append(
+			ContainerSpecification.formatSystemProperties(containerSpec.getSystemProperties()));
+
+		// finalize JVM args
+		env.addVariables(variable(MesosConfigKeys.ENV_JVM_ARGS, jvmArgs.toString()));
+
+		// build the launch command w/ dynamic application properties
+		StringBuilder launchCommand = new StringBuilder("$FLINK_HOME/bin/mesos-taskmanager.sh ");
+		launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+		cmd.setValue(launchCommand.toString());
+
+		// build the container info
+		Protos.ContainerInfo.Builder containerInfo = null;
+		switch(params.containerType()) {
+			case MESOS:
+				if(params.containerImageName().isDefined()) {
+					containerInfo = Protos.ContainerInfo.newBuilder()
+						.setType(Protos.ContainerInfo.Type.MESOS)
+						.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
+						.setImage(Protos.Image.newBuilder()
+							.setType(Protos.Image.Type.DOCKER)
+							.setDocker(Protos.Image.Docker.newBuilder()
+								.setName(params.containerImageName().get()))));
+				}
+				break;
+
+			case DOCKER:
+				assert(params.containerImageName().isDefined());
+				containerInfo = Protos.ContainerInfo.newBuilder()
+					.setType(Protos.ContainerInfo.Type.DOCKER)
+					.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
+						.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
+						.setImage(params.containerImageName().get()));
+				break;
+
+			default:
+				throw new IllegalStateException("unsupported container type");
+		}
+		if(containerInfo != null) {
+			taskInfo.setContainer(containerInfo);
+		}
 
 		return taskInfo.build();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index ef58250..4b9bd82 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -22,14 +22,17 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
-
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
 import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
@@ -38,14 +41,20 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.mesos.util.ZooKeeperUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -53,21 +62,15 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
-
-import org.apache.hadoop.security.UserGroupInformation;
-
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import scala.Option;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URL;
-import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -75,9 +78,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.mesos.Utils.uri;
-import static org.apache.flink.mesos.Utils.variable;
-
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -106,6 +106,18 @@ public class MesosApplicationMasterRunner {
 	private static final int ACTOR_DIED_EXIT_CODE = 32;
 
 	// ------------------------------------------------------------------------
+	//  Command-line options
+	// ------------------------------------------------------------------------
+
+	private static final Options ALL_OPTIONS;
+
+	static {
+		ALL_OPTIONS =
+			new Options()
+			.addOption(BootstrapTools.newDynamicPropertiesOption());
+	}
+
+	// ------------------------------------------------------------------------
 	//  Program entry point
 	// ------------------------------------------------------------------------
 
@@ -126,36 +138,44 @@ public class MesosApplicationMasterRunner {
 
 	/**
 	 * The instance entry point for the Mesos AppMaster. Obtains user group
-	 * information and calls the main work method {@link #runPrivileged(Configuration)} as a
+	 * information and calls the main work method {@link #runPrivileged(Configuration,Configuration)} as a
 	 * privileged action.
 	 *
 	 * @param args The command line arguments.
 	 * @return The process exit code.
 	 */
-	protected int run(String[] args) {
+	protected int run(final String[] args) {
 		try {
 			LOG.debug("All environment variables: {}", ENV);
 
-			final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-			checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+			// loading all config values here has the advantage that the program fails fast, if any
+			// configuration problem occurs
 
-			// Flink configuration
-			final Configuration dynamicProperties =
-					FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+			CommandLineParser parser = new PosixParser();
+			CommandLine cmd = parser.parse(ALL_OPTIONS, args);
 
-			final Configuration configuration = createConfiguration(workingDir, dynamicProperties);
+			final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+			GlobalConfiguration.setDynamicProperties(dynamicProperties);
+			final Configuration config = GlobalConfiguration.loadConfiguration();
 
-			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
+			// configure the default filesystem
+			try {
+				FileSystem.setDefaultScheme(config);
+			} catch (IOException e) {
+				throw new IOException("Error while setting the default " +
+					"filesystem scheme from configuration.", e);
+			}
+
+			// configure security
+			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config);
 			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
 			SecurityUtils.install(sc);
 
-			LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName());
-
+			// run the actual work in the installed security context
 			return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 				@Override
-				public Integer call() {
-					return runPrivileged(configuration);
+				public Integer call() throws Exception {
+					return runPrivileged(config, dynamicProperties);
 				}
 			});
 		}
@@ -175,78 +195,38 @@ public class MesosApplicationMasterRunner {
 	 *
 	 * @return The return code for the Java process.
 	 */
-	protected int runPrivileged(Configuration config) {
+	protected int runPrivileged(Configuration config, Configuration dynamicProperties) {
 
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
 		MesosArtifactServer artifactServer = null;
-
-		// ------- (1) load and parse / validate all configurations -------
-
-		// loading all config values here has the advantage that the program fails fast, if any
-		// configuration problem occurs
-
-		final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-
-		final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
-		checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
-
-		// Note that we use the "appMasterHostname" given by the system, to make sure
-		// we use the hostnames consistently throughout akka.
-		// for akka "localhost" and "localhost.localdomain" are different actors.
-		final String appMasterHostname;
+		ExecutorService futureExecutor = null;
+		ExecutorService ioExecutor = null;
 
 		try {
-			appMasterHostname = InetAddress.getLocalHost().getHostName();
-		} catch (UnknownHostException uhe) {
-			LOG.error("Could not retrieve the local hostname.", uhe);
+			// ------- (1) load and parse / validate all configurations -------
 
-			return INIT_ERROR_EXIT_CODE;
-		}
+			// Note that we use the "appMasterHostname" given by the system, to make sure
+			// we use the hostnames consistently throughout akka.
+			// for akka "localhost" and "localhost.localdomain" are different actors.
+			final String appMasterHostname = InetAddress.getLocalHost().getHostName();
 
-		// Mesos configuration
-		final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
+			// Mesos configuration
+			final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
 
-		int numberProcessors = Hardware.getNumberCPUCores();
+			// JM configuration
+			int numberProcessors = Hardware.getNumberCPUCores();
 
-		final ExecutorService futureExecutor = Executors.newFixedThreadPool(
-			numberProcessors,
-			new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+			futureExecutor = Executors.newFixedThreadPool(
+				numberProcessors,
+				new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
 
-		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
-			numberProcessors,
-			new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
+			ioExecutor = Executors.newFixedThreadPool(
+				numberProcessors,
+				new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
 
-		try {
-			// environment values related to TM
-			final int taskManagerContainerMemory;
-			final int numInitialTaskManagers;
-			final int slotsPerTaskManager;
-
-			try {
-				taskManagerContainerMemory = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY));
-			} catch (NumberFormatException e) {
-				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_MEMORY + " : "
-					+ e.getMessage());
-			}
-			try {
-				numInitialTaskManagers = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT));
-			} catch (NumberFormatException e) {
-				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_COUNT + " : "
-					+ e.getMessage());
-			}
-			try {
-				slotsPerTaskManager = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS));
-			} catch (NumberFormatException e) {
-				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_SLOTS + " : "
-					+ e.getMessage());
-			}
-
-			final ContaineredTaskManagerParameters containeredParameters =
-				ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager);
-
-			final MesosTaskManagerParameters taskManagerParameters =
-				MesosTaskManagerParameters.create(config, containeredParameters);
+			// TM configuration
+			final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config);
 
 			LOG.info("TaskManagers will be created with {} task slots",
 				taskManagerParameters.containeredParameters().numSlots());
@@ -257,7 +237,7 @@ public class MesosApplicationMasterRunner {
 				taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
 				taskManagerParameters.cpus());
 
-			// JM endpoint, which should be explicitly configured by the dispatcher (based on acquired net resources)
+			// JM endpoint, which should be explicitly configured based on acquired net resources
 			final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
 				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
 			checkState(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" +
@@ -279,18 +259,28 @@ public class MesosApplicationMasterRunner {
 			LOG.debug("Starting Artifact Server");
 			final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
 				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
-			artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort, config);
+			final String artifactServerPrefix = UUID.randomUUID().toString();
+			artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config);
 
 			// ----------------- (3) Generate the configuration for the TaskManagers -------------------
 
+			// generate a container spec which conveys the artifacts/vars needed to launch a TM
+			ContainerSpecification taskManagerContainerSpec = new ContainerSpecification();
+
+			// propagate the AM dynamic configuration to the TM
+			taskManagerContainerSpec.getDynamicConfiguration().addAll(dynamicProperties);
+
+			// propagate newly-generated configuration elements
 			final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
-				config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
-			LOG.debug("TaskManager configuration: {}", taskManagerConfig);
+				new Configuration(), akkaHostname, akkaPort, taskManagerParameters.containeredParameters().numSlots(),
+				TASKMANAGER_REGISTRATION_TIMEOUT);
+			taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig);
+
+			// apply the overlays
+			applyOverlays(config, taskManagerContainerSpec);
 
-			final Protos.TaskInfo.Builder taskManagerContext = createTaskManagerContext(
-				config, ENV,
-				taskManagerParameters, taskManagerConfig,
-				workingDir, getTaskManagerClass(), artifactServer, LOG);
+			// configure the artifact server to serve the specified artifacts
+			configureArtifactServer(artifactServer, taskManagerContainerSpec);
 
 			// ----------------- (4) start the actors -------------------
 
@@ -341,8 +331,8 @@ public class MesosApplicationMasterRunner {
 				workerStore,
 				leaderRetriever,
 				taskManagerParameters,
-				taskManagerContext,
-				numInitialTaskManagers,
+				taskManagerContainerSpec,
+				artifactServer,
 				LOG);
 
 			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master");
@@ -389,8 +379,21 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
-			futureExecutor.shutdownNow();
-			ioExecutor.shutdownNow();
+			if(futureExecutor != null) {
+				try {
+					futureExecutor.shutdownNow();
+				} catch (Throwable tt) {
+					LOG.error("Error shutting down future executor", tt);
+				}
+			}
+
+			if(ioExecutor != null) {
+				try {
+					ioExecutor.shutdownNow();
+				} catch (Throwable tt) {
+					LOG.error("Error shutting down io executor", tt);
+				}
+			}
 
 			return INIT_ERROR_EXIT_CODE;
 		}
@@ -442,35 +445,12 @@ public class MesosApplicationMasterRunner {
 		return MemoryArchivist.class;
 	}
 
-	protected Class<? extends TaskManager> getTaskManagerClass() {
-		return MesosTaskManager.class;
-	}
-
-	/**
-	 *
-	 * @param baseDirectory
-	 * @param additional
-	 *
-	 * @return The configuration to be used by the TaskManagers.
-	 */
-	private static Configuration createConfiguration(String baseDirectory, Configuration additional) {
-		LOG.info("Loading config from directory {}", baseDirectory);
-
-		Configuration configuration = GlobalConfiguration.loadConfiguration();
-
-		// add dynamic properties to JobManager configuration.
-		configuration.addAll(additional);
-
-		return configuration;
-	}
-
 	/**
 	 * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration.
 	 */
 	public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) {
 
 		Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder()
-			.setUser("")
 			.setHostname(hostname);
 		Protos.Credential.Builder credential = null;
 
@@ -494,6 +474,10 @@ public class MesosApplicationMasterRunner {
 			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
 			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
 
+		frameworkInfo.setUser(flinkConfig.getString(
+			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
+			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
+
 		if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
 			frameworkInfo.setPrincipal(flinkConfig.getString(
 				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
@@ -501,15 +485,16 @@ public class MesosApplicationMasterRunner {
 			credential = Protos.Credential.newBuilder();
 			credential.setPrincipal(frameworkInfo.getPrincipal());
 
-			if(!flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
-				throw new IllegalConfigurationException(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET + " must be configured.");
+			// some environments use a side-channel to communicate the secret to Mesos,
+			// and thus don't set the 'secret' configuration setting
+			if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+				credential.setSecret(flinkConfig.getString(
+					ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
 			}
-			credential.setSecret(flinkConfig.getString(
-				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
 		}
 
 		MesosConfiguration mesos =
-			new MesosConfiguration(masterUrl, frameworkInfo, Option.apply(credential));
+			new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential));
 
 		return mesos;
 	}
@@ -533,203 +518,34 @@ public class MesosApplicationMasterRunner {
 	}
 
 	/**
-	 * Creates a Mesos task info template, which describes how to bring up a TaskManager process as
-	 * a Mesos task.
+	 * Generate a container specification as a TaskManager template.
 	 *
 	 * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
-	 * needs (such as JAR file, config file, ...) and all environment variables in a task info record.
+	 * needs (such as JAR file, config file, ...) and all environment variables into a container specification.
 	 * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
 	 * A lightweight HTTP server serves the artifacts to the fetcher.
-	 *
-	 * <p>We do this work before we start the ResourceManager actor in order to fail early if
-	 * any of the operations here fail.
-	 *
-	 * @param flinkConfig
-	 *         The Flink configuration object.
-	 * @param env
-	 *         The environment variables.
-	 * @param tmParams
-	 *         The TaskManager container memory parameters.
-	 * @param taskManagerConfig
-	 *         The configuration for the TaskManagers.
-	 * @param workingDirectory
-	 *         The current application master container's working directory.
-	 * @param taskManagerMainClass
-	 *         The class with the main method.
-	 * @param artifactServer
-	 *         The artifact server.
-	 * @param log
-	 *         The logger.
-	 *
-	 * @return The task info template for the TaskManager processes.
-	 *
-	 * @throws Exception Thrown if the task info could not be created, for example if
-	 *                   the resources could not be copied.
-	 */
-	public static Protos.TaskInfo.Builder createTaskManagerContext(
-		Configuration flinkConfig,
-		Map<String, String> env,
-		MesosTaskManagerParameters tmParams,
-		Configuration taskManagerConfig,
-		String workingDirectory,
-		Class<?> taskManagerMainClass,
-		MesosArtifactServer artifactServer,
-		Logger log) throws Exception {
-
-
-		Protos.TaskInfo.Builder info = Protos.TaskInfo.newBuilder();
-		Protos.CommandInfo.Builder cmd = Protos.CommandInfo.newBuilder();
-
-		log.info("Setting up artifacts for TaskManagers");
-
-		String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
-		checkState(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
-
-		String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH);
-		checkState(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH);
-
-		// register the Flink jar
-		final File flinkJarFile = new File(workingDirectory, "flink.jar");
-		cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true));
-
-		String hadoopConfDir = env.get("HADOOP_CONF_DIR");
-		LOG.debug("ENV: hadoopConfDir = {}", hadoopConfDir);
-
-		//upload Hadoop configurations to artifact server
-		boolean hadoopConf = false;
-		if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
-			File source = new File(hadoopConfDir);
-			if(source.exists() && source.isDirectory()) {
-				hadoopConf = true;
-				File[] fileList = source.listFiles();
-				for(File file: fileList) {
-					if(file.getName().equals("core-site.xml") || file.getName().equals("hdfs-site.xml")) {
-						LOG.debug("Adding local file: [{}] to artifact server", file);
-						File f = new File(hadoopConfDir, file.getName());
-						cmd.addUris(uri(artifactServer.addFile(f, file.getName()), true));
-					}
-				}
-			}
-		}
-
-		//upload keytab to the artifact server
-		String keytabFileName = null;
-		String keytab = flinkConfig.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
-		if(keytab != null) {
-			File source = new File(keytab);
-			if(source.exists()) {
-				LOG.debug("Adding keytab file: [{}] to artifact server", source);
-				keytabFileName = source.getName();
-				cmd.addUris(uri(artifactServer.addFile(source, source.getName()), true));
-			}
-		}
-
-		String principal = flinkConfig.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
-		if(keytabFileName != null && principal != null) {
-			//reset the configurations since we will use in-memory reference from within the TM instance
-			taskManagerConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,"");
-			taskManagerConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,"");
-		}
-
-		// register the TaskManager configuration
-		final File taskManagerConfigFile =
-			new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
-		LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
-		BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
-		cmd.addUris(uri(artifactServer.addFile(taskManagerConfigFile, GlobalConfiguration.FLINK_CONF_FILENAME), true));
-
-		// prepare additional files to be shipped
-		for (String pathStr : shipListString.split(",")) {
-			if (!pathStr.isEmpty()) {
-				File shipFile = new File(workingDirectory, pathStr);
-				cmd.addUris(uri(artifactServer.addFile(shipFile, shipFile.getName()), true));
-			}
-		}
-
-		log.info("Creating task info for TaskManagers");
-
-		// build the launch command
-		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
-		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
-		boolean hasKrb5 = false;
-
-		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
-			flinkConfig, tmParams.containeredParameters(), ".", ".",
-			hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
-		cmd.setValue(launchCommand);
-
-		// build the environment variables
-		Protos.Environment.Builder envBuilder = Protos.Environment.newBuilder();
-		for (Map.Entry<String, String> entry : tmParams.containeredParameters().taskManagerEnv().entrySet()) {
-			envBuilder.addVariables(variable(entry.getKey(), entry.getValue()));
-		}
-		envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString));
-
-		//add hadoop config directory to the environment
-		if(hadoopConf) {
-			envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_CONF_DIR, "."));
-		}
-
-		//add keytab and principal to environment
-		if(keytabFileName != null && principal != null) {
-			envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB, keytabFileName));
-			envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL, principal));
-		}
-
-		envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_USER_NAME,
-				UserGroupInformation.getCurrentUser().getUserName()));
-
-		cmd.setEnvironment(envBuilder);
-
-		info.setCommand(cmd);
-
-		// Set container for task manager if specified in configs.
-		String tmImageName = flinkConfig.getString(
-			ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_NAME, "");
-
-		if (tmImageName.length() > 0) {
-			String taskManagerContainerType = flinkConfig.getString(
-				ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE,
-				ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE);
-
-			Protos.ContainerInfo.Builder containerInfo;
-
-			switch (taskManagerContainerType) {
-				case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
-					containerInfo = Protos.ContainerInfo.newBuilder()
-						.setType(Protos.ContainerInfo.Type.MESOS)
-						.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
-							.setImage(Protos.Image.newBuilder()
-								.setType(Protos.Image.Type.DOCKER)
-								.setDocker(Protos.Image.Docker.newBuilder()
-									.setName(tmImageName))));
-					break;
-				case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
-					containerInfo = Protos.ContainerInfo.newBuilder()
-						.setType(Protos.ContainerInfo.Type.DOCKER)
-						.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
-							.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
-							.setImage(tmImageName));
-					break;
-				default:
-					LOG.warn(
-						"Invalid container type '{}' provided for setting {}. Valid values are '{}' or '{}'. " +
-							"Starting task managers now without container.",
-						taskManagerContainerType,
-						ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE,
-						ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS,
-						ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER);
-
-					containerInfo = null;
-
-					break;
-			}
+     */
+	private static void applyOverlays(
+		Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException {
+
+		// create the overlays that will produce the specification
+		CompositeContainerOverlay overlay = new CompositeContainerOverlay(
+			FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build()
+		);
+
+		// apply the overlays
+		overlay.configure(containerSpec);
+	}
 
-			if (containerInfo != null) {
-				info.setContainer(containerInfo);
-			}
+	private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
+		// serve the artifacts associated with the container environment
+		for(ContainerSpecification.Artifact artifact : container.getArtifacts()) {
+			server.addPath(artifact.source, artifact.dest);
 		}
-
-		return info;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index bc6dde4..ebd9af5 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -26,21 +26,20 @@ public class MesosConfigKeys {
 	//  Environment variable names
 	// ------------------------------------------------------------------------
 
-	public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
-	public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
-	public static final String ENV_SLOTS = "_SLOTS";
-	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
-	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
+	/**
+	 * The Mesos task ID, used by the TM for informational purposes
+	 */
 	public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+
+	/**
+	 * Reserved for future enhancement
+	 */
 	public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
-	public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
-	public static final String ENV_CLASSPATH = "CLASSPATH";
-	public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX";
-	public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID";
-	public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
-	public static final String ENV_KEYTAB = "_KEYTAB_FILE";
-	public static final String ENV_KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+
+	/**
+	 * JVM arguments, used by the JM and TM
+	 */
+	public static final String ENV_JVM_ARGS = "JVM_ARGS";
 
 	/** Private constructor to prevent instantiation */
 	private MesosConfigKeys() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 6b24ee8..a7321a3 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -27,6 +27,7 @@ import com.netflix.fenzo.functions.Action1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
@@ -44,9 +45,11 @@ import org.apache.flink.mesos.scheduler.messages.ReRegistered;
 import org.apache.flink.mesos.scheduler.messages.Registered;
 import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
 import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -76,8 +79,11 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	/** The TaskManager container parameters (like container memory size) */
 	private final MesosTaskManagerParameters taskManagerParameters;
 
-	/** Context information used to start a TaskManager Java process */
-	private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+	/** Container specification for launching a TM */
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	/** Resolver for HTTP artifacts **/
+	private final MesosArtifactResolver artifactResolver;
 
 	/** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
 	private final int maxFailedTasks;
@@ -112,7 +118,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		MesosWorkerStore workerStore,
 		LeaderRetrievalService leaderRetrievalService,
 		MesosTaskManagerParameters taskManagerParameters,
-		Protos.TaskInfo.Builder taskManagerLaunchContext,
+		ContainerSpecification taskManagerContainerSpec,
+		MesosArtifactResolver artifactResolver,
 		int maxFailedTasks,
 		int numInitialTaskManagers) {
 
@@ -121,9 +128,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		this.mesosConfig = requireNonNull(mesosConfig);
 
 		this.workerStore = requireNonNull(workerStore);
+		this.artifactResolver = requireNonNull(artifactResolver);
 
 		this.taskManagerParameters = requireNonNull(taskManagerParameters);
-		this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext);
+		this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec);
 		this.maxFailedTasks = maxFailedTasks;
 
 		this.workersInNew = new HashMap<>();
@@ -661,7 +669,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
 		LaunchableMesosWorker launchable =
-			new LaunchableMesosWorker(taskManagerParameters, taskManagerLaunchContext, taskID);
+			new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec, taskID);
 		return launchable;
 	}
 
@@ -723,10 +731,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	 *             The Flink configuration object.
 	 * @param taskManagerParameters
 	 *             The parameters for launching TaskManager containers.
-	 * @param taskManagerLaunchContext
-	 *             The parameters for launching the TaskManager processes in the TaskManager containers.
-	 * @param numInitialTaskManagers
-	 *             The initial number of TaskManagers to allocate.
+	 * @param taskManagerContainerSpec
+	 *             The container specification.
+	 * @param artifactResolver
+	 *             The artifact resolver to locate artifacts
 	 * @param log
 	 *             The logger to log to.
 	 *
@@ -738,10 +746,22 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			MesosWorkerStore workerStore,
 			LeaderRetrievalService leaderRetrievalService,
 			MesosTaskManagerParameters taskManagerParameters,
-			Protos.TaskInfo.Builder taskManagerLaunchContext,
-			int numInitialTaskManagers,
+			ContainerSpecification taskManagerContainerSpec,
+			MesosArtifactResolver artifactResolver,
 			Logger log)
 	{
+
+		final int numInitialTaskManagers = flinkConfig.getInteger(
+			ConfigConstants.MESOS_INITIAL_TASKS, 0);
+		if (numInitialTaskManagers >= 0) {
+			log.info("Mesos framework to allocate {} initial tasks",
+				numInitialTaskManagers);
+		}
+		else {
+			throw new IllegalConfigurationException("Invalid value for " +
+				ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least zero.");
+		}
+
 		final int maxFailedTasks = flinkConfig.getInteger(
 			ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers);
 		if (maxFailedTasks >= 0) {
@@ -755,7 +775,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			workerStore,
 			leaderRetrievalService,
 			taskManagerParameters,
-			taskManagerLaunchContext,
+			taskManagerContainerSpec,
+			artifactResolver,
 			maxFailedTasks,
 			numInitialTaskManagers);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 1b19d08..7fae58c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -19,10 +19,14 @@
 package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import scala.Option;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
  * This class describes the Mesos-specific parameters for launching a TaskManager process.
@@ -32,13 +36,43 @@ import static java.util.Objects.requireNonNull;
  */
 public class MesosTaskManagerParameters {
 
-	private double cpus;
+	public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
+			key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
+			.defaultValue(1);
 
-	private ContaineredTaskManagerParameters containeredParameters;
+	public static final ConfigOption<Integer> MESOS_RM_TASKS_MEMORY_MB =
+			key("mesos.resourcemanager.tasks.mem")
+			.defaultValue(1024);
 
-	public MesosTaskManagerParameters(double cpus, ContaineredTaskManagerParameters containeredParameters) {
+	public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
+			key("mesos.resourcemanager.tasks.cpus")
+			.defaultValue(0.0);
+
+	public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE =
+		key("mesos.resourcemanager.tasks.container.type")
+			.defaultValue("mesos");
+
+	public static final ConfigOption<String> MESOS_RM_CONTAINER_IMAGE_NAME =
+		key("mesos.resourcemanager.tasks.container.image.name")
+			.noDefaultValue();
+
+	private final double cpus;
+
+	private final ContainerType containerType;
+
+	private final Option<String> containerImageName;
+
+	private final ContaineredTaskManagerParameters containeredParameters;
+
+	public MesosTaskManagerParameters(
+		double cpus,
+		ContainerType containerType,
+		Option<String> containerImageName,
+		ContaineredTaskManagerParameters containeredParameters) {
 		requireNonNull(containeredParameters);
 		this.cpus = cpus;
+		this.containerType = containerType;
+		this.containerImageName = containerImageName;
 		this.containeredParameters = containeredParameters;
 	}
 
@@ -50,6 +84,22 @@ public class MesosTaskManagerParameters {
 	}
 
 	/**
+	 * Get the container type (Mesos or Docker).  The default is Mesos.
+	 *
+	 * Mesos provides a facility for a framework to specify which containerizer to use.
+     */
+	public ContainerType containerType() {
+		return containerType;
+	}
+
+	/**
+	 * Get the container image name.
+     */
+	public Option<String> containerImageName() {
+		return containerImageName;
+	}
+
+	/**
 	 * Get the common containered parameters.
      */
 	public ContaineredTaskManagerParameters containeredParameters() {
@@ -60,6 +110,8 @@ public class MesosTaskManagerParameters {
 	public String toString() {
 		return "MesosTaskManagerParameters{" +
 			"cpus=" + cpus +
+			", containerType=" + containerType +
+			", containerImageName=" + containerImageName +
 			", containeredParameters=" + containeredParameters +
 			'}';
 	}
@@ -67,15 +119,49 @@ public class MesosTaskManagerParameters {
 	/**
 	 * Create the Mesos TaskManager parameters.
 	 * @param flinkConfig the TM configuration.
-	 * @param containeredParameters additional containered parameters.
      */
-	public static MesosTaskManagerParameters create(
-		Configuration flinkConfig,
-		ContaineredTaskManagerParameters containeredParameters) {
+	public static MesosTaskManagerParameters create(Configuration flinkConfig) {
 
-		double cpus = flinkConfig.getDouble(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CPUS,
-			Math.max(containeredParameters.numSlots(), 1.0));
+		// parse the common parameters
+		ContaineredTaskManagerParameters containeredParameters = ContaineredTaskManagerParameters.create(
+			flinkConfig,
+			flinkConfig.getInteger(MESOS_RM_TASKS_MEMORY_MB),
+			flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
+
+		double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS);
+		if(cpus <= 0.0) {
+			cpus = Math.max(containeredParameters.numSlots(), 1.0);
+		}
+
+		// parse the containerization parameters
+		String imageName = flinkConfig.getString(MESOS_RM_CONTAINER_IMAGE_NAME);
+
+		ContainerType containerType;
+		String containerTypeString = flinkConfig.getString(MESOS_RM_CONTAINER_TYPE);
+		switch(containerTypeString) {
+			case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
+				containerType = ContainerType.MESOS;
+				break;
+			case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
+				containerType = ContainerType.DOCKER;
+				if(imageName == null || imageName.length() == 0) {
+					throw new IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() +
+						" must be specified for docker container type");
+				}
+				break;
+			default:
+				throw new IllegalConfigurationException("invalid container type: " + containerTypeString);
+		}
+
+		return new MesosTaskManagerParameters(
+			cpus,
+			containerType,
+			Option.apply(imageName),
+			containeredParameters);
+	}
 
-		return new MesosTaskManagerParameters(cpus, containeredParameters);
+	public enum ContainerType {
+		MESOS,
+		DOCKER
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 5100deb..75b5043 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,15 +18,20 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -35,7 +40,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,24 +51,33 @@ public class MesosTaskManagerRunner {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class);
 
+	private static final Options ALL_OPTIONS;
+
+	static {
+		ALL_OPTIONS =
+			new Options()
+				.addOption(BootstrapTools.newDynamicPropertiesOption());
+	}
+
 	/** The process environment variables */
 	private static final Map<String, String> ENV = System.getenv();
 
-	public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws IOException {
+	public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws Exception {
 		EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args);
 		SignalHandler.register(LOG);
 		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
 		// try to parse the command line arguments
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
 		final Configuration configuration;
 		try {
-			configuration = TaskManager.parseArgsAndLoadConfig(args);
-
-			// add dynamic properties to TaskManager configuration.
-			final Configuration dynamicProperties =
-				FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+			GlobalConfiguration.setDynamicProperties(dynamicProperties);
 			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
-			configuration.addAll(dynamicProperties);
+
+			configuration = GlobalConfiguration.loadConfiguration();
 		}
 		catch (Throwable t) {
 			LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
@@ -74,7 +87,6 @@ public class MesosTaskManagerRunner {
 
 		// read the environment variables
 		final Map<String, String> envs = System.getenv();
-		final String effectiveUsername = envs.get(MesosConfigKeys.ENV_HADOOP_USER_NAME);
 		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
 
 		// configure local directory
@@ -88,20 +100,12 @@ public class MesosTaskManagerRunner {
 			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
 		}
 
-		final String keytab = envs.get(MesosConfigKeys.ENV_KEYTAB);
-		LOG.info("Keytab file:{}", keytab);
-
-		final String principal = envs.get(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL);
-		LOG.info("Keytab principal:{}", principal);
-
-		if(keytab != null && keytab.length() != 0) {
-			File f = new File(".", keytab);
-			if(!f.exists()) {
-				LOG.error("Could not locate keytab file:[" + keytab + "]");
-				System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-			}
-			configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab);
-			configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, principal);
+		// configure the default filesystem
+		try {
+			FileSystem.setDefaultScheme(configuration);
+		} catch (IOException e) {
+			throw new IOException("Error while setting the default " +
+				"filesystem scheme from configuration.", e);
 		}
 
 		// tell akka to die in case of an error
@@ -112,23 +116,17 @@ public class MesosTaskManagerRunner {
 		final ResourceID resourceId = new ResourceID(containerID);
 		LOG.info("ResourceID assigned for this container: {}", resourceId);
 
-		String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
-		LOG.info("hadoopConfDir: {}", hadoopConfDir);
-
+		// Run the TM in the security context
 		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
-		if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
-			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
-		}
+		sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+		SecurityUtils.install(sc);
 
 		try {
-			SecurityUtils.install(sc);
-			LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
-					UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
-			SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
+			SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 				@Override
-				public Object call() throws Exception {
+				public Integer call() throws Exception {
 					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
-					return null;
+					return 0;
 				}
 			});
 		}
@@ -136,6 +134,5 @@ public class MesosTaskManagerRunner {
 			LOG.error("Error while starting the TaskManager", t);
 			System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
 		}
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
new file mode 100644
index 0000000..a6a26dc
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import org.apache.flink.core.fs.Path;
+import scala.Option;
+
+import java.net.URL;
+
+/**
+ * An interface for resolving artifact URIs.
+ */
+public interface MesosArtifactResolver {
+	Option<URL> resolve(Path remoteFile);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index fbf61ac..37cb260 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -26,7 +26,6 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.DefaultFileRegion;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
@@ -43,24 +42,32 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.stream.ChunkedStream;
+
+import io.netty.handler.stream.ChunkedWriteHandler;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.CharsetUtil;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.jets3t.service.utils.Mimetypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
 
 import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
@@ -82,7 +89,7 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
  * http://mesos.apache.org/documentation/latest/fetcher/
  * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
  */
-public class MesosArtifactServer {
+public class MesosArtifactServer implements MesosArtifactResolver {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class);
 
@@ -92,17 +99,19 @@ public class MesosArtifactServer {
 
 	private Channel serverChannel;
 
-	private URL baseURL;
+	private final URL baseURL;
+
+	private final Map<Path,URL> paths = new HashMap<>();
 
 	private final SSLContext serverSSLContext;
 
-	public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort, Configuration config)
-			throws Exception {
+	public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config)
+		throws Exception {
 		if (configuredPort < 0 || configuredPort > 0xFFFF) {
 			throw new IllegalArgumentException("File server port is invalid: " + configuredPort);
 		}
 
-		// Config to enable https access to the web-ui
+		// Config to enable https access to the artifact server
 		boolean enableSSL = config.getBoolean(
 				ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
 				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
@@ -136,6 +145,7 @@ public class MesosArtifactServer {
 
 				ch.pipeline()
 					.addLast(new HttpServerCodec())
+					.addLast(new ChunkedWriteHandler())
 					.addLast(handler.name(), handler)
 					.addLast(new UnknownFileHandler());
 			}
@@ -159,11 +169,15 @@ public class MesosArtifactServer {
 
 		String httpProtocol = (serverSSLContext != null) ? "https": "http";
 
-		baseURL = new URL(httpProtocol, serverHostname, port, "/" + sessionID + "/");
+		baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");
 
 		LOG.info("Mesos Artifact Server Base URL: {}, listening at {}:{}", baseURL, address, port);
 	}
 
+	public URL baseURL() {
+		return baseURL;
+	}
+
 	/**
 	 * Get the server port on which the artifact server is listening.
 	 */
@@ -185,13 +199,51 @@ public class MesosArtifactServer {
 	 * @param remoteFile the remote path with which to locate the file.
 	 * @return the fully-qualified remote path to the file.
 	 * @throws MalformedURLException if the remote path is invalid.
+     */
+	public synchronized URL addFile(File localFile, String remoteFile) throws IOException, MalformedURLException {
+		return addPath(new Path(localFile.toURI()), new Path(remoteFile));
+	}
+
+	/**
+	 * Adds a path to the artifact server.
+	 * @param path the qualified FS path to serve (local, hdfs, etc).
+	 * @param remoteFile the remote path with which to locate the file.
+	 * @return the fully-qualified remote path to the file.
+	 * @throws MalformedURLException if the remote path is invalid.
 	 */
-	public synchronized URL addFile(File localFile, String remoteFile) throws MalformedURLException {
-		URL fileURL = new URL(baseURL, remoteFile);
-		router.ANY(fileURL.getPath(), new VirtualFileServerHandler(localFile));
+	public synchronized URL addPath(Path path, Path remoteFile) throws IOException, MalformedURLException {
+		if(paths.containsKey(remoteFile)) {
+			throw new IllegalArgumentException("duplicate path registered");
+		}
+		if(remoteFile.isAbsolute()) {
+			throw new IllegalArgumentException("not expecting an absolute path");
+		}
+		URL fileURL = new URL(baseURL, remoteFile.toString());
+		router.ANY(fileURL.getPath(), new VirtualFileServerHandler(path));
+
+		paths.put(remoteFile, fileURL);
+
 		return fileURL;
 	}
 
+	public synchronized void removePath(Path remoteFile) {
+		if(paths.containsKey(remoteFile)) {
+			URL fileURL = null;
+			try {
+				fileURL = new URL(baseURL, remoteFile.toString());
+			} catch (MalformedURLException e) {
+				throw new RuntimeException(e);
+			}
+			router.removePath(fileURL.getPath());
+		}
+	}
+
+	@Override
+	public synchronized Option<URL> resolve(Path remoteFile) {
+		Option<URL> resolved = Option.apply(paths.get(remoteFile));
+		return resolved;
+	}
+
 	/**
 	 * Stops the artifact server.
 	 * @throws Exception
@@ -215,12 +267,17 @@ public class MesosArtifactServer {
 	@ChannelHandler.Sharable
 	public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<Routed> {
 
-		private final File file;
+		private FileSystem fs;
+		private Path path;
 
-		public VirtualFileServerHandler(File file) {
-			this.file = file;
-			if(!file.exists()) {
-				throw new IllegalArgumentException("no such file: " + file.getAbsolutePath());
+		public VirtualFileServerHandler(Path path) throws IOException {
+			this.path = path;
+			if(!path.isAbsolute()) {
+				throw new IllegalArgumentException("path must be absolute: " + path.toString());
+			}
+			this.fs = path.getFileSystem();
+			if(!fs.exists(path) || fs.getFileStatus(path).isDir()) {
+				throw new IllegalArgumentException("no such file: " + path.toString());
 			}
 		}
 
@@ -230,7 +287,7 @@ public class MesosArtifactServer {
 			HttpRequest request = routed.request();
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} request for file '{}'", request.getMethod(), file.getAbsolutePath());
+				LOG.debug("{} request for file '{}'", request.getMethod(), path);
 			}
 
 			if(!(request.getMethod() == GET || request.getMethod() == HEAD)) {
@@ -238,47 +295,40 @@ public class MesosArtifactServer {
 				return;
 			}
 
-			final RandomAccessFile raf;
+
+			final FileStatus status;
 			try {
-				raf = new RandomAccessFile(file, "r");
+				status = fs.getFileStatus(path);
 			}
-			catch (FileNotFoundException e) {
+			catch (IOException e) {
+				LOG.error("unable to stat file", e);
 				sendError(ctx, GONE);
 				return;
 			}
-			try {
-				long fileLength = raf.length();
 
-				// compose the response
-				HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-				if (HttpHeaders.isKeepAlive(request)) {
-					response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-				}
-				HttpHeaders.setHeader(response, CACHE_CONTROL, "private");
-				HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM);
-				HttpHeaders.setContentLength(response, fileLength);
+			// compose the response
+			HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+			HttpHeaders.setHeader(response, CONNECTION, HttpHeaders.Values.CLOSE);
+			HttpHeaders.setHeader(response, CACHE_CONTROL, "private");
+			HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM);
+			HttpHeaders.setContentLength(response, status.getLen());
 
-				ctx.write(response);
+			ctx.write(response);
 
-				if (request.getMethod() == GET) {
-					// write the content.  Netty's DefaultFileRegion will close the file.
-					ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
+			if (request.getMethod() == GET) {
+				// write the content.  Netty will close the stream.
+				final FSDataInputStream stream = fs.open(path);
+				try {
+					ctx.write(new ChunkedStream(stream));
 				}
-				else {
-					// close the file immediately in HEAD case
-					raf.close();
+				catch(Exception e) {
+					stream.close();
+					throw e;
 				}
-				ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-
-				// close the connection, if no keep-alive is needed
-				if (!HttpHeaders.isKeepAlive(request)) {
-					lastContentFuture.addListener(ChannelFutureListener.CLOSE);
-				}
-			}
-			catch(Exception ex) {
-				raf.close();
-				throw ex;
 			}
+
+			ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index f287e13..93ccf68 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -33,10 +33,12 @@ import org.apache.flink.mesos.scheduler.LaunchCoordinator;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
 import org.apache.flink.mesos.scheduler.messages.*;
 import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.messages.*;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -79,6 +81,7 @@ public class MesosFlinkResourceManagerTest {
 
 	private static Configuration config = new Configuration() {{
 		setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
+		setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
 	}};
 
 	@BeforeClass
@@ -107,12 +110,13 @@ public class MesosFlinkResourceManagerTest {
 			MesosWorkerStore workerStore,
 			LeaderRetrievalService leaderRetrievalService,
 			MesosTaskManagerParameters taskManagerParameters,
-			Protos.TaskInfo.Builder taskManagerLaunchContext,
+			ContainerSpecification taskManagerContainerSpec,
+			MesosArtifactResolver artifactResolver,
 			int maxFailedTasks,
 			int numInitialTaskManagers) {
 
 			super(flinkConfig, mesosConfig, workerStore, leaderRetrievalService, taskManagerParameters,
-				taskManagerLaunchContext, maxFailedTasks, numInitialTaskManagers);
+				taskManagerContainerSpec, artifactResolver, maxFailedTasks, numInitialTaskManagers);
 		}
 
 		@Override
@@ -141,6 +145,7 @@ public class MesosFlinkResourceManagerTest {
 		public LeaderRetrievalService retrievalService;
 		public MesosConfiguration mesosConfig;
 		public MesosWorkerStore workerStore;
+		public MesosArtifactResolver artifactResolver;
 		public SchedulerDriver schedulerDriver;
 		public TestingMesosFlinkResourceManager resourceManagerInstance;
 		public ActorGateway resourceManager;
@@ -176,6 +181,9 @@ public class MesosFlinkResourceManagerTest {
 				// worker store
 				workerStore = mock(MesosWorkerStore.class);
 				when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
+
+				// artifact
+				artifactResolver = mock(MesosArtifactResolver.class);
 			} catch (Exception ex) {
 				throw new RuntimeException(ex);
 			}
@@ -185,15 +193,16 @@ public class MesosFlinkResourceManagerTest {
 		 * Initialize the resource manager.
 		 */
 		public void initialize() {
+			ContainerSpecification containerSpecification = new ContainerSpecification();
 			ContaineredTaskManagerParameters containeredParams =
 				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
-			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(1.0, containeredParams);
-			Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder();
+			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
+				1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams);
 
 			TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
 				TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(
 					TestingMesosFlinkResourceManager.class,
-					config, mesosConfig, workerStore, retrievalService, tmParams, taskInfo, 0, LOG));
+					config, mesosConfig, workerStore, retrievalService, tmParams, containerSpecification, artifactResolver, LOG));
 			resourceManagerInstance = resourceManagerRef.underlyingActor();
 			resourceManager = new AkkaActorGateway(resourceManagerRef, null);
 


[3/3] flink git commit: [test-utils] cleanup and improve method to set the environment

Posted by mx...@apache.org.
[test-utils] cleanup and improve method to set the environment

- introduced parameter to update or overwrite the environment
- make Windows-specific code explicit
- avoid duplicate update of environment map


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d7c3ff0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d7c3ff0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d7c3ff0

Branch: refs/heads/master
Commit: 8d7c3ff087ac1390e6da0054ec55974258b6b78c
Parents: 230bf17
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Dec 5 19:23:32 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Dec 6 00:29:57 2016 +0100

----------------------------------------------------------------------
 .../flink/core/testutils/CommonTestUtils.java   | 52 ++++++++++----------
 1 file changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d7c3ff0/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 45c5a77..2eb18c1 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
-import java.util.Collections;
 import java.util.Map;
 
 import static org.junit.Assert.fail;
@@ -118,37 +117,38 @@ public class CommonTestUtils {
 		}
 	}
 
-	// This code is taken from: http://stackoverflow.com/a/7201825/568695
+	public static void setEnv(Map<String, String> newenv) {
+		setEnv(newenv, true);
+	}
+
+	// This code is taken slightly modified from: http://stackoverflow.com/a/7201825/568695
 	// it changes the environment variables of this JVM. Use only for testing purposes!
 	@SuppressWarnings("unchecked")
-	public static void setEnv(Map<String, String> newenv) {
+	public static void setEnv(Map<String, String> newenv, boolean clearExisting) {
 		try {
+			Map<String, String> env = System.getenv();
+			Class<?> clazz = env.getClass();
+			Field field = clazz.getDeclaredField("m");
+			field.setAccessible(true);
+			Map<String, String> map = (Map<String, String>) field.get(env);
+			if (clearExisting) {
+				map.clear();
+			}
+			map.putAll(newenv);
+
+			// only for Windows
 			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
-			Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
-			theEnvironmentField.setAccessible(true);
-			Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
-			env.putAll(newenv);
-			Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
-			theCaseInsensitiveEnvironmentField.setAccessible(true);
-			Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
-			cienv.putAll(newenv);
-		} catch (NoSuchFieldException e) {
 			try {
-				Class<?>[] classes = Collections.class.getDeclaredClasses();
-				Map<String, String> env = System.getenv();
-				for (Class<?> cl : classes) {
-					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
-						Field field = cl.getDeclaredField("m");
-						field.setAccessible(true);
-						Object obj = field.get(env);
-						Map<String, String> map = (Map<String, String>) obj;
-						map.clear();
-						map.putAll(newenv);
-					}
+				Field theCaseInsensitiveEnvironmentField =
+					processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+				theCaseInsensitiveEnvironmentField.setAccessible(true);
+				Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
+				if (clearExisting) {
+					cienv.clear();
 				}
-			} catch (Exception e2) {
-				throw new RuntimeException(e2);
-			}
+				cienv.putAll(newenv);
+			} catch (NoSuchFieldException ignored) {}
+
 		} catch (Exception e1) {
 			throw new RuntimeException(e1);
 		}