You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/19 15:25:52 UTC

[2/2] flink git commit: [FLINK-6630] [FLINK-6631] Implement FLIP-6 Mesos cluster entrypoints + MesosTaskExecutorRunner

[FLINK-6630] [FLINK-6631] Implement FLIP-6 Mesos cluster entrypoints + MesosTaskExecutorRunner

- bin: new entrypoints scripts for flip-6
- ClusterEntrypoint: Refactor the shutdown method
- ClusterEntrypoint: Install default FileSystem (for parity with legacy entrypoints)
- ClusterEntrypoint: new MesosJobClusterEntrypoint, MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner
- MesosServices: enhanced with artifactServer, localActorSystem
- MesosResourceManager: Fallback to old TM params when UNKNOWN resource profile is provided
- MesosResourceManager: config setting for taskmanager startup script (mesos.resourcemanager.tasks.taskmanager-cmd)
- test: added a 'noop' job graph for testing purposes

This closes #4555.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbac4a6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbac4a6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbac4a6c

Branch: refs/heads/master
Commit: bbac4a6c922199db08a5244d0fa1262a5f16d479
Parents: 76f1022
Author: Wright, Eron <er...@emc.com>
Authored: Wed Aug 16 14:30:24 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 19 17:25:21 2017 +0200

----------------------------------------------------------------------
 .../mesos-bin/mesos-appmaster-flip6-job.sh      |  47 +++++
 .../mesos-bin/mesos-appmaster-flip6-session.sh  |  47 +++++
 .../mesos-bin/mesos-taskmanager-flip6.sh        |  45 ++++
 .../mesos/entrypoint/MesosEntrypointUtils.java  | 177 ++++++++++++++++
 .../entrypoint/MesosJobClusterEntrypoint.java   | 204 +++++++++++++++++++
 .../MesosSessionClusterEntrypoint.java          | 178 ++++++++++++++++
 .../entrypoint/MesosTaskExecutorRunner.java     | 132 ++++++++++++
 .../clusterframework/LaunchableMesosWorker.java |  30 ++-
 .../MesosApplicationMasterRunner.java           | 131 +-----------
 .../clusterframework/MesosResourceManager.java  |  47 +++--
 .../MesosTaskManagerParameters.java             |  20 +-
 .../services/AbstractMesosServices.java         |  73 +++++++
 .../services/MesosServices.java                 |  17 ++
 .../services/MesosServicesUtils.java            |  28 ++-
 .../services/StandaloneMesosServices.java       |  10 +-
 .../services/ZooKeeperMesosServices.java        |  30 ++-
 .../MesosFlinkResourceManagerTest.java          |   1 +
 .../MesosResourceManagerTest.java               |  48 +++--
 .../runtime/entrypoint/ClusterEntrypoint.java   |  27 +++
 .../entrypoint/JobClusterEntrypoint.java        |  10 +-
 .../entrypoint/SessionClusterEntrypoint.java    |   8 +-
 .../test/runtime/entrypoint/StreamingNoop.java  |  60 ++++++
 22 files changed, 1186 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
new file mode 100755
index 0000000..b21670a
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+    FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint "$@"
+
+rc=$?
+
+if [[ $rc -ne 0 ]]; then
+    echo "Error while starting the mesos application master. Please check ${log} for more details."
+fi
+
+exit $rc

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
new file mode 100755
index 0000000..b9e0f53
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+    FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint "$@"
+
+rc=$?
+
+if [[ $rc -ne 0 ]]; then
+    echo "Error while starting the mesos application master. Please check ${log} for more details."
+fi
+
+exit $rc

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
new file mode 100755
index 0000000..f251442
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
@@ -0,0 +1,45 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log=flink-taskmanager.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+# Add precomputed memory JVM options
+if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then
+    FLINK_ENV_JAVA_OPTS_MEM=""
+fi
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}"
+
+# Add TaskManager-specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner "$@"
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
new file mode 100755
index 0000000..0d81ead
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utils for Mesos entrpoints.
+ */
+public class MesosEntrypointUtils {
+
+	/**
+	 * Loads the global configuration and adds the dynamic properties parsed from
+	 * the given command line.
+	 *
+	 * @param cmd command line to parse for dynamic properties
+	 * @return Global configuration with dynamic properties set
+	 * @deprecated replace once FLINK-7269 has been merged
+	 */
+	@Deprecated
+	public static Configuration loadConfiguration(CommandLine cmd) {
+
+		// merge the dynamic properties from the command-line
+		Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+		GlobalConfiguration.setDynamicProperties(dynamicProperties);
+		Configuration config = GlobalConfiguration.loadConfiguration();
+
+		return config;
+	}
+
+	/**
+	 * Loads and validates the Mesos scheduler configuration.
+	 * @param flinkConfig the global configuration.
+	 * @param hostname the hostname to advertise to the Mesos master.
+	 */
+	public static MesosConfiguration createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) {
+
+		Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder()
+			.setHostname(hostname);
+		Protos.Credential.Builder credential = null;
+
+		if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+			throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
+		}
+		String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL);
+
+		Duration failoverTimeout = FiniteDuration.apply(
+			flinkConfig.getInteger(
+				MesosOptions.FAILOVER_TIMEOUT_SECONDS),
+				TimeUnit.SECONDS);
+		frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
+
+		frameworkInfo.setName(flinkConfig.getString(
+			MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
+
+		frameworkInfo.setRole(flinkConfig.getString(
+			MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
+
+		frameworkInfo.setUser(flinkConfig.getString(
+			MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
+
+		if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+			frameworkInfo.setPrincipal(flinkConfig.getString(
+				MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
+
+			credential = Protos.Credential.newBuilder();
+			credential.setPrincipal(frameworkInfo.getPrincipal());
+
+			// some environments use a side-channel to communicate the secret to Mesos,
+			// and thus don't set the 'secret' configuration setting
+			if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+				credential.setSecret(flinkConfig.getString(
+					MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
+			}
+		}
+
+		MesosConfiguration mesos =
+			new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential));
+
+		return mesos;
+	}
+
+	public static MesosTaskManagerParameters createTmParameters(Configuration configuration, Logger log) {
+		// TM configuration
+		final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(configuration);
+
+		log.info("TaskManagers will be created with {} task slots",
+			taskManagerParameters.containeredParameters().numSlots());
+		log.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " +
+				"JVM direct memory limit {} MB, {} cpus",
+			taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
+			taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
+			taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
+			taskManagerParameters.cpus());
+
+		return taskManagerParameters;
+	}
+
+	public static ContainerSpecification createContainerSpec(Configuration configuration, Configuration dynamicProperties)
+		throws Exception {
+		// generate a container spec which conveys the artifacts/vars needed to launch a TM
+		ContainerSpecification spec = new ContainerSpecification();
+
+		// propagate the AM dynamic configuration to the TM
+		spec.getDynamicConfiguration().addAll(dynamicProperties);
+
+		applyOverlays(configuration, spec);
+
+		return spec;
+	}
+
+	/**
+	 * Generate a container specification as a TaskManager template.
+	 *
+	 * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
+	 * needs (such as JAR file, config file, ...) and all environment variables into a container specification.
+	 * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
+	 * A lightweight HTTP server serves the artifacts to the fetcher.
+	 */
+	public static void applyOverlays(
+		Configuration configuration, ContainerSpecification containerSpec) throws IOException {
+
+		// create the overlays that will produce the specification
+		CompositeContainerOverlay overlay = new CompositeContainerOverlay(
+			FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(),
+			HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(),
+			HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(),
+			KeytabOverlay.newBuilder().fromEnvironment(configuration).build(),
+			Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(),
+			SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build()
+		);
+
+		// apply the overlays
+		overlay.configure(containerSpec);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
new file mode 100755
index 0000000..890c4a7
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+	// ------------------------------------------------------------------------
+	//  Command-line options
+	// ------------------------------------------------------------------------
+
+	private static final Options ALL_OPTIONS;
+
+	static {
+		ALL_OPTIONS =
+			new Options()
+				.addOption(BootstrapTools.newDynamicPropertiesOption());
+	}
+
+	private MesosConfiguration schedulerConfiguration;
+
+	private MesosServices mesosServices;
+
+	private MesosTaskManagerParameters taskManagerParameters;
+
+	private ContainerSpecification taskManagerContainerSpec;
+
+	public MesosJobClusterEntrypoint(Configuration config) {
+		super(config);
+	}
+
+	@Override
+	protected void initializeServices(Configuration config) throws Exception {
+		super.initializeServices(config);
+
+		final String hostname = config.getString(JobManagerOptions.ADDRESS);
+
+		// Mesos configuration
+		schedulerConfiguration = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+		// services
+		mesosServices = MesosServicesUtils.createMesosServices(config, hostname);
+
+		// TM configuration
+		taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
+		taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties());
+	}
+
+	@Override
+	protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
+		super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
+	}
+
+	@Override
+	protected ResourceManager<?> createResourceManager(
+		Configuration configuration,
+		ResourceID resourceId,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		FatalErrorHandler fatalErrorHandler) throws Exception {
+		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			rmServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new MesosResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			rmConfiguration,
+			highAvailabilityServices,
+			heartbeatServices,
+			rmRuntimeServices.getSlotManager(),
+			metricRegistry,
+			rmRuntimeServices.getJobLeaderIdService(),
+			fatalErrorHandler,
+			configuration,
+			mesosServices,
+			schedulerConfiguration,
+			taskManagerParameters,
+			taskManagerContainerSpec
+			);
+	}
+
+	@Override
+	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
+		File fp = new File(jobGraphFile);
+
+		try (FileInputStream input = new FileInputStream(fp);
+			ObjectInputStream obInput = new ObjectInputStream(input)) {
+
+			return (JobGraph) obInput.readObject();
+		} catch (FileNotFoundException e) {
+			throw new FlinkException("Could not find the JobGraph file.", e);
+		} catch (ClassNotFoundException | IOException e) {
+			throw new FlinkException("Could not load the JobGraph from file.", e);
+		}
+	}
+
+	@Override
+	protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
+		Throwable exception = null;
+
+		try {
+			super.stopClusterComponents(cleanupHaData);
+		} catch (Throwable t) {
+			exception = ExceptionUtils.firstOrSuppressed(t, exception);
+		}
+
+		if (mesosServices != null) {
+			try {
+				mesosServices.close(cleanupHaData);
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+		}
+
+		if (exception != null) {
+			throw new FlinkException("Could not properly shut down the Mesos job cluster entry point.", exception);
+		}
+	}
+
+	public static void main(String[] args) {
+		// load configuration incl. dynamic properties
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd;
+		try {
+			cmd = parser.parse(ALL_OPTIONS, args);
+		}
+		catch (Exception e){
+			LOG.error("Could not parse the command-line options.", e);
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
+			return;
+		}
+
+		Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+
+		MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration);
+
+		clusterEntrypoint.startCluster();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
new file mode 100755
index 0000000..67f5899
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+/**
+ * Entry point for Mesos session clusters.
+ */
+public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
+
+	// ------------------------------------------------------------------------
+	//  Command-line options
+	// ------------------------------------------------------------------------
+
+	private static final Options ALL_OPTIONS;
+
+	static {
+		ALL_OPTIONS =
+			new Options()
+				.addOption(BootstrapTools.newDynamicPropertiesOption());
+	}
+
+	private MesosConfiguration mesosConfig;
+
+	private MesosServices mesosServices;
+
+	private MesosTaskManagerParameters taskManagerParameters;
+
+	private ContainerSpecification taskManagerContainerSpec;
+
+	public MesosSessionClusterEntrypoint(Configuration config) {
+		super(config);
+	}
+
+	@Override
+	protected void initializeServices(Configuration config) throws Exception {
+		super.initializeServices(config);
+
+		final String hostname = config.getString(JobManagerOptions.ADDRESS);
+
+		// Mesos configuration
+		mesosConfig = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+		// services
+		mesosServices = MesosServicesUtils.createMesosServices(config, hostname);
+
+		// TM configuration
+		taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
+		taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties());
+	}
+
+	@Override
+	protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
+		super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
+	}
+
+	@Override
+	protected ResourceManager<?> createResourceManager(
+		Configuration configuration,
+		ResourceID resourceId,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		FatalErrorHandler fatalErrorHandler) throws Exception {
+		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			rmServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new MesosResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			rmConfiguration,
+			highAvailabilityServices,
+			heartbeatServices,
+			rmRuntimeServices.getSlotManager(),
+			metricRegistry,
+			rmRuntimeServices.getJobLeaderIdService(),
+			fatalErrorHandler,
+			configuration,
+			mesosServices,
+			mesosConfig,
+			taskManagerParameters,
+			taskManagerContainerSpec
+			);
+	}
+
+	@Override
+	protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
+		Throwable exception = null;
+
+		try {
+			super.stopClusterComponents(cleanupHaData);
+		} catch (Throwable t) {
+			exception = t;
+		}
+
+		if (mesosServices != null) {
+			try {
+				mesosServices.close(cleanupHaData);
+			} catch (Throwable t) {
+				exception = t;
+			}
+		}
+
+		if (exception != null) {
+			throw new FlinkException("Could not properly shut down the Mesos session cluster entry point.", exception);
+		}
+	}
+
+	public static void main(String[] args) {
+		// load configuration incl. dynamic properties
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd;
+		try {
+			cmd = parser.parse(ALL_OPTIONS, args);
+		}
+		catch (Exception e){
+			LOG.error("Could not parse the command-line options.", e);
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
+			return;
+		}
+
+		Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+
+		MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration);
+
+		clusterEntrypoint.startCluster();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
new file mode 100644
index 0000000..c4343d2
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskExecutorRunner {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MesosTaskExecutorRunner.class);
+
+	private static final int INIT_ERROR_EXIT_CODE = 31;
+
+	private static final Options ALL_OPTIONS;
+
+	static {
+		ALL_OPTIONS =
+			new Options()
+				.addOption(BootstrapTools.newDynamicPropertiesOption());
+	}
+
+	public static void main(String[] args) throws Exception {
+		EnvironmentInformation.logEnvironmentInfo(LOG, MesosTaskExecutorRunner.class.getSimpleName(), args);
+		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+		// try to parse the command line arguments
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
+		final Configuration configuration;
+		try {
+			configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+		}
+		catch (Throwable t) {
+			LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
+			System.exit(INIT_ERROR_EXIT_CODE);
+			return;
+		}
+
+		// read the environment variables
+		final Map<String, String> envs = System.getenv();
+		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
+
+		// configure local directory
+		String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+		if (flinkTempDirs != null) {
+			LOG.info("Overriding Mesos temporary file directories with those " +
+				"specified in the Flink config: {}", flinkTempDirs);
+		}
+		else if (tmpDirs != null) {
+			LOG.info("Setting directories for temporary files to: {}", tmpDirs);
+			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
+		}
+
+		// configure the default filesystem
+		try {
+			FileSystem.setDefaultScheme(configuration);
+		} catch (IOException e) {
+			throw new IOException("Error while setting the default " +
+				"filesystem scheme from configuration.", e);
+		}
+
+		// tell akka to die in case of an error
+		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+
+		// Infer the resource identifier from the environment variable
+		String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
+		final ResourceID resourceId = new ResourceID(containerID);
+		LOG.info("ResourceID assigned for this container: {}", resourceId);
+
+		// Run the TM in the security context
+		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
+		SecurityUtils.install(sc);
+
+		try {
+			SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
+				@Override
+				public Integer call() throws Exception {
+					TaskManagerRunner.runTaskManager(configuration, resourceId);
+
+					return 0;
+				}
+			});
+		}
+		catch (Throwable t) {
+			LOG.error("Error while starting the TaskManager", t);
+			System.exit(INIT_ERROR_EXIT_CODE);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index ce7bb9d..2c32507 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.Utils;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -36,6 +37,7 @@ import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -261,12 +263,15 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, mesosConfiguration.frameworkInfo().getName()));
 
 		// build the launch command w/ dynamic application properties
-		Option<String> bootstrapCmdOption = params.bootstrapCommand();
-
-		final String bootstrapCommand = bootstrapCmdOption.isDefined() ? bootstrapCmdOption.get() + " && " : "";
-		final String launchCommand = bootstrapCommand + "$FLINK_HOME/bin/mesos-taskmanager.sh " + ContainerSpecification.formatSystemProperties(dynamicProperties);
-
-		cmd.setValue(launchCommand);
+		StringBuilder launchCommand = new StringBuilder();
+		if (params.bootstrapCommand().isDefined()) {
+			launchCommand.append(params.bootstrapCommand().get()).append(" && ");
+		}
+		launchCommand
+			.append(params.command())
+			.append(" ")
+			.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+		cmd.setValue(launchCommand.toString());
 
 		// build the container info
 		Protos.ContainerInfo.Builder containerInfo = Protos.ContainerInfo.newBuilder();
@@ -312,4 +317,17 @@ public class LaunchableMesosWorker implements LaunchableTask {
 			"taskRequest=" + taskRequest +
 			'}';
 	}
+
+	/**
+	 * Configures an artifact server to serve the artifacts associated with a container specification.
+	 * @param server the server to configure.
+	 * @param container the container with artifacts to serve.
+	 * @throws IOException if the artifacts cannot be accessed.
+	 */
+	static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
+		// serve the artifacts associated with the container environment
+		for (ContainerSpecification.Artifact artifact : container.getArtifacts()) {
+			server.addPath(artifact.source, artifact.dest);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
old mode 100644
new mode 100755
index 7891386..3d16a66
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -21,11 +21,10 @@ package org.apache.flink.mesos.runtime.clusterframework;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.mesos.entrypoint.MesosEntrypointUtils;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
@@ -34,13 +33,6 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -65,7 +57,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +64,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -81,7 +71,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -222,7 +211,7 @@ public class MesosApplicationMasterRunner {
 			LOG.info("App Master Hostname to use: {}", appMasterHostname);
 
 			// Mesos configuration
-			final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
+			final MesosConfiguration mesosConfig = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, appMasterHostname);
 
 			// JM configuration
 			int numberProcessors = Hardware.getNumberCPUCores();
@@ -235,19 +224,10 @@ public class MesosApplicationMasterRunner {
 				numberProcessors,
 				new ExecutorThreadFactory("mesos-jobmanager-io"));
 
-			mesosServices = MesosServicesUtils.createMesosServices(config);
+			mesosServices = MesosServicesUtils.createMesosServices(config, appMasterHostname);
 
 			// TM configuration
-			final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config);
-
-			LOG.info("TaskManagers will be created with {} task slots",
-				taskManagerParameters.containeredParameters().numSlots());
-			LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " +
-					"JVM direct memory limit {} MB, {} cpus",
-				taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
-				taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
-				taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
-				taskManagerParameters.cpus());
+			final MesosTaskManagerParameters taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
 
 			// JM endpoint, which should be explicitly configured based on acquired net resources
 			final int listeningPort = config.getInteger(JobManagerOptions.PORT);
@@ -268,9 +248,7 @@ public class MesosApplicationMasterRunner {
 
 			// try to start the artifact server
 			LOG.debug("Starting Artifact Server");
-			final int artifactServerPort = config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT);
-			final String artifactServerPrefix = UUID.randomUUID().toString();
-			artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config);
+			artifactServer = mesosServices.getArtifactServer();
 
 			// ----------------- (3) Generate the configuration for the TaskManagers -------------------
 
@@ -287,10 +265,10 @@ public class MesosApplicationMasterRunner {
 			taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig);
 
 			// apply the overlays
-			applyOverlays(config, taskManagerContainerSpec);
+			MesosEntrypointUtils.applyOverlays(config, taskManagerContainerSpec);
 
 			// configure the artifact server to serve the specified artifacts
-			configureArtifactServer(artifactServer, taskManagerContainerSpec);
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
 
 			// ----------------- (4) start the actors -------------------
 
@@ -386,14 +364,6 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
-			if (artifactServer != null) {
-				try {
-					artifactServer.stop();
-				} catch (Throwable ignored) {
-					LOG.error("Failed to stop the artifact server", ignored);
-				}
-			}
-
 			if (actorSystem != null) {
 				try {
 					actorSystem.shutdown();
@@ -444,12 +414,6 @@ public class MesosApplicationMasterRunner {
 			}
 		}
 
-		try {
-			artifactServer.stop();
-		} catch (Throwable t) {
-			LOG.error("Failed to stop the artifact server", t);
-		}
-
 		if (highAvailabilityServices != null) {
 			try {
 				highAvailabilityServices.close();
@@ -490,85 +454,4 @@ public class MesosApplicationMasterRunner {
 		return MemoryArchivist.class;
 	}
 
-	/**
-	 * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration.
-	 */
-	public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) {
-
-		Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder()
-			.setHostname(hostname);
-		Protos.Credential.Builder credential = null;
-
-		if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
-			throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
-		}
-		String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL);
-
-		Duration failoverTimeout = FiniteDuration.apply(
-			flinkConfig.getInteger(
-				MesosOptions.FAILOVER_TIMEOUT_SECONDS),
-			TimeUnit.SECONDS);
-		frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
-
-		frameworkInfo.setName(flinkConfig.getString(
-			MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
-
-		frameworkInfo.setRole(flinkConfig.getString(
-			MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
-
-		frameworkInfo.setUser(flinkConfig.getString(
-			MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
-
-		if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
-			frameworkInfo.setPrincipal(flinkConfig.getString(
-				MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
-
-			credential = Protos.Credential.newBuilder();
-			credential.setPrincipal(frameworkInfo.getPrincipal());
-
-			// some environments use a side-channel to communicate the secret to Mesos,
-			// and thus don't set the 'secret' configuration setting
-			if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
-				credential.setSecret(flinkConfig.getString(
-					MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
-			}
-		}
-
-		MesosConfiguration mesos =
-			new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential));
-
-		return mesos;
-	}
-
-	/**
-	 * Generate a container specification as a TaskManager template.
-	 *
-	 * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
-	 * needs (such as JAR file, config file, ...) and all environment variables into a container specification.
-	 * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
-	 * A lightweight HTTP server serves the artifacts to the fetcher.
-	 */
-	private static void applyOverlays(
-		Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException {
-
-		// create the overlays that will produce the specification
-		CompositeContainerOverlay overlay = new CompositeContainerOverlay(
-			FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-			HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-			HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-			KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-			Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-			SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build()
-		);
-
-		// apply the overlays
-		overlay.configure(containerSpec);
-	}
-
-	private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
-		// serve the artifacts associated with the container environment
-		for (ContainerSpecification.Artifact artifact : container.getArtifacts()) {
-			server.addPath(artifact.source, artifact.dest);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 736af59..445010b 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -38,7 +39,7 @@ import org.apache.flink.mesos.scheduler.messages.Registered;
 import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
 import org.apache.flink.mesos.scheduler.messages.SlaveLost;
 import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
@@ -75,6 +76,7 @@ import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -98,17 +100,20 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	/** The Mesos configuration (master and framework info). */
 	private final MesosConfiguration mesosConfig;
 
+	/** The Mesos services needed by the resource manager. */
+	private final MesosServices mesosServices;
+
 	/** The TaskManager container parameters (like container memory size). */
 	private final MesosTaskManagerParameters taskManagerParameters;
 
 	/** Container specification for launching a TM. */
 	private final ContainerSpecification taskManagerContainerSpec;
 
-	/** Resolver for HTTP artifacts. */
-	private final MesosArtifactResolver artifactResolver;
+	/** Server for HTTP artifacts. */
+	private final MesosArtifactServer artifactServer;
 
 	/** Persistent storage of allocated containers. */
-	private final MesosWorkerStore workerStore;
+	private MesosWorkerStore workerStore;
 
 	/** A local actor system for using the helper actors. */
 	private final ActorSystem actorSystem;
@@ -145,13 +150,11 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler,
 			// Mesos specifics
-			ActorSystem actorSystem,
 			Configuration flinkConfig,
+			MesosServices mesosServices,
 			MesosConfiguration mesosConfig,
-			MesosWorkerStore workerStore,
 			MesosTaskManagerParameters taskManagerParameters,
-			ContainerSpecification taskManagerContainerSpec,
-			MesosArtifactResolver artifactResolver) {
+			ContainerSpecification taskManagerContainerSpec) {
 		super(
 			rpcService,
 			resourceManagerEndpointId,
@@ -164,13 +167,13 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			jobLeaderIdService,
 			fatalErrorHandler);
 
-		this.actorSystem = Preconditions.checkNotNull(actorSystem);
+		this.mesosServices = Preconditions.checkNotNull(mesosServices);
+		this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
 
 		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
 		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
 
-		this.workerStore = Preconditions.checkNotNull(workerStore);
-		this.artifactResolver = Preconditions.checkNotNull(artifactResolver);
+		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
 
 		this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
 		this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);
@@ -221,8 +224,9 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	 */
 	@Override
 	protected void initialize() throws ResourceManagerException {
-		// start the worker store
+		// create and start the worker store
 		try {
+			this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig, getRpcService().getExecutor());
 			workerStore.start();
 		} catch (Exception e) {
 			throw new ResourceManagerException("Unable to initialize the worker store.", e);
@@ -266,6 +270,14 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			throw new ResourceManagerException("Unable to recover Mesos worker state.", e);
 		}
 
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+
 		// begin scheduling
 		connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
 		schedulerDriver.start();
@@ -627,20 +639,23 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			taskManagerParameters.containerType(),
 			taskManagerParameters.containerImageName(),
 			new ContaineredTaskManagerParameters(
-				resourceProfile.getMemoryInMB() < 0 ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(),
-				resourceProfile.getHeapMemoryInMB(),
-				resourceProfile.getDirectMemoryInMB(),
+				ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(),
+				ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : resourceProfile.getHeapMemoryInMB(),
+				ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() : resourceProfile.getDirectMemoryInMB(),
 				1,
 				new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
 			taskManagerParameters.containerVolumes(),
 			taskManagerParameters.constraints(),
+			taskManagerParameters.command(),
 			taskManagerParameters.bootstrapCommand(),
 			taskManagerParameters.getTaskManagerHostname()
 		);
 
+		LOG.debug("LaunchableMesosWorker parameters: {}", params);
+
 		LaunchableMesosWorker launchable =
 			new LaunchableMesosWorker(
-				artifactResolver,
+				artifactServer,
 				params,
 				taskManagerContainerSpec,
 				taskID,

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index f5a415e..3859913 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -74,6 +74,10 @@ public class MesosTaskManagerParameters {
 		key("mesos.resourcemanager.tasks.hostname")
 		.noDefaultValue();
 
+	public static final ConfigOption<String> MESOS_TM_CMD =
+		key("mesos.resourcemanager.tasks.taskmanager-cmd")
+		.defaultValue("$FLINK_HOME/bin/mesos-taskmanager.sh"); // internal
+
 	public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
 		key("mesos.resourcemanager.tasks.bootstrap-cmd")
 		.noDefaultValue();
@@ -107,6 +111,8 @@ public class MesosTaskManagerParameters {
 
 	private final List<ConstraintEvaluator> constraints;
 
+	private final String command;
+
 	private final Option<String> bootstrapCommand;
 
 	private final Option<String> taskManagerHostname;
@@ -118,6 +124,7 @@ public class MesosTaskManagerParameters {
 			ContaineredTaskManagerParameters containeredParameters,
 			List<Protos.Volume> containerVolumes,
 			List<ConstraintEvaluator> constraints,
+			String command,
 			Option<String> bootstrapCommand,
 			Option<String> taskManagerHostname) {
 
@@ -127,6 +134,7 @@ public class MesosTaskManagerParameters {
 		this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
 		this.containerVolumes = Preconditions.checkNotNull(containerVolumes);
 		this.constraints = Preconditions.checkNotNull(constraints);
+		this.command = Preconditions.checkNotNull(command);
 		this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
 		this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
 	}
@@ -183,6 +191,13 @@ public class MesosTaskManagerParameters {
 	}
 
 	/**
+	 * Get the command.
+	 */
+	public String command() {
+		return command;
+	}
+
+	/**
 	 * Get the bootstrap command.
 	 */
 	public Option<String> bootstrapCommand() {
@@ -199,6 +214,7 @@ public class MesosTaskManagerParameters {
 			", containerVolumes=" + containerVolumes +
 			", constraints=" + constraints +
 			", taskManagerHostName=" + taskManagerHostname +
+			", command=" + command +
 			", bootstrapCommand=" + bootstrapCommand +
 			'}';
 	}
@@ -249,7 +265,8 @@ public class MesosTaskManagerParameters {
 		//obtain Task Manager Host Name from the configuration
 		Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
 
-		//obtain bootstrap command from the configuration
+		//obtain command-line from the configuration
+		String tmCommand = flinkConfig.getString(MESOS_TM_CMD);
 		Option<String> tmBootstrapCommand = Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));
 
 		return new MesosTaskManagerParameters(
@@ -259,6 +276,7 @@ public class MesosTaskManagerParameters {
 			containeredParameters,
 			containerVolumes,
 			constraints,
+			tmCommand,
 			tmBootstrapCommand,
 			taskManagerHostname);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
new file mode 100644
index 0000000..e4f4cf7
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import akka.actor.ActorSystem;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abrstact implementation of {@link MesosServices}.
+ */
+public abstract class AbstractMesosServices implements MesosServices {
+
+	private final ActorSystem actorSystem;
+
+	private final MesosArtifactServer artifactServer;
+
+	protected AbstractMesosServices(ActorSystem actorSystem, MesosArtifactServer artifactServer) {
+		this.actorSystem = checkNotNull(actorSystem);
+		this.artifactServer = checkNotNull(artifactServer);
+	}
+
+	@Override
+	public ActorSystem getLocalActorSystem() {
+		return actorSystem;
+	}
+
+	@Override
+	public MesosArtifactServer getArtifactServer() {
+		return artifactServer;
+	}
+
+	@Override
+	public void close(boolean cleanup) throws Exception {
+		Throwable exception = null;
+
+		try {
+			actorSystem.shutdown();
+		} catch (Throwable t) {
+			exception = ExceptionUtils.firstOrSuppressed(t, exception);
+		}
+
+		try {
+			artifactServer.stop();
+		} catch (Throwable t) {
+			exception = ExceptionUtils.firstOrSuppressed(t, exception);
+		}
+
+		if (exception != null) {
+			throw new FlinkException("Could not properly shut down the Mesos services.", exception);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
index 5655bfc..6a64f4f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
@@ -20,6 +20,9 @@ package org.apache.flink.mesos.runtime.clusterframework.services;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+
+import akka.actor.ActorSystem;
 
 import java.util.concurrent.Executor;
 
@@ -42,6 +45,20 @@ public interface MesosServices {
 		Executor executor) throws Exception;
 
 	/**
+	 * Gets a local {@link ActorSystem} which is used for child actors within
+	 * {@link org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager}.
+	 *
+	 * @return a reference to an actor system.
+	 */
+	ActorSystem getLocalActorSystem();
+
+	/**
+	 * Gets the artifact server with which to serve essential resources to task managers.
+	 * @return a reference to an artifact server.
+	 */
+	MesosArtifactServer getArtifactServer();
+
+	/**
 	 * Closes all state maintained by the mesos services implementation.
 	 *
 	 * @param cleanup is true if a cleanup shall be performed

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
index 370a760..c5a8516 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -20,9 +20,16 @@ package org.apache.flink.mesos.runtime.clusterframework.services;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
 
+import akka.actor.ActorSystem;
+
+import java.util.UUID;
+
 /**
  * Utilities for the {@link MesosServices}.
  */
@@ -32,15 +39,21 @@ public class MesosServicesUtils {
 	 * Creates a {@link MesosServices} instance depending on the high availability settings.
 	 *
 	 * @param configuration containing the high availability settings
+	 * @param hostname the hostname to advertise to remote clients
 	 * @return a mesos services instance
 	 * @throws Exception if the mesos services instance could not be created
 	 */
-	public static MesosServices createMesosServices(Configuration configuration) throws Exception {
+	public static MesosServices createMesosServices(Configuration configuration, String hostname) throws Exception {
+
+		ActorSystem localActorSystem = AkkaUtils.createLocalActorSystem(configuration);
+
+		MesosArtifactServer artifactServer = createArtifactServer(configuration, hostname);
+
 		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
 
 		switch (highAvailabilityMode) {
 			case NONE:
-				return new StandaloneMesosServices();
+				return new StandaloneMesosServices(localActorSystem, artifactServer);
 
 			case ZOOKEEPER:
 				final String zkMesosRootPath = configuration.getString(
@@ -50,10 +63,19 @@ public class MesosServicesUtils {
 					configuration,
 					zkMesosRootPath);
 
-				return new ZooKeeperMesosServices(zooKeeperUtilityFactory);
+				return new ZooKeeperMesosServices(localActorSystem, artifactServer, zooKeeperUtilityFactory);
 
 			default:
 				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
 		}
 	}
+
+	private static MesosArtifactServer createArtifactServer(Configuration configuration, String hostname) throws Exception {
+		final int artifactServerPort = configuration.getInteger(MesosOptions.ARTIFACT_SERVER_PORT, 0);
+
+		// a random prefix is affixed to artifact URLs to ensure uniqueness in the Mesos fetcher cache
+		final String artifactServerPrefix = UUID.randomUUID().toString();
+
+		return new MesosArtifactServer(artifactServerPrefix, hostname, artifactServerPort, configuration);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
index aa3157f..b93fd29 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
@@ -21,13 +21,20 @@ package org.apache.flink.mesos.runtime.clusterframework.services;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+
+import akka.actor.ActorSystem;
 
 import java.util.concurrent.Executor;
 
 /**
  * {@link MesosServices} implementation for the standalone mode.
  */
-public class StandaloneMesosServices implements MesosServices {
+public class StandaloneMesosServices extends AbstractMesosServices {
+
+	protected StandaloneMesosServices(ActorSystem actorSystem, MesosArtifactServer artifactServer) {
+		super(actorSystem, artifactServer);
+	}
 
 	@Override
 	public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor) {
@@ -36,5 +43,6 @@ public class StandaloneMesosServices implements MesosServices {
 
 	@Override
 	public void close(boolean cleanup) throws Exception {
+		super.close(cleanup);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 2883e4f..069cb83 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -21,25 +21,31 @@ package org.apache.flink.mesos.runtime.clusterframework.services;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import akka.actor.ActorSystem;
+
 import java.util.concurrent.Executor;
 
 /**
  * {@link MesosServices} implementation for the ZooKeeper high availability based mode.
  */
-public class ZooKeeperMesosServices implements MesosServices {
+public class ZooKeeperMesosServices extends AbstractMesosServices {
 
 	// Factory to create ZooKeeper utility classes
 	private final ZooKeeperUtilityFactory zooKeeperUtilityFactory;
 
-	public ZooKeeperMesosServices(ZooKeeperUtilityFactory zooKeeperUtilityFactory) {
+	public ZooKeeperMesosServices(ActorSystem actorSystem, MesosArtifactServer artifactServer, ZooKeeperUtilityFactory zooKeeperUtilityFactory) {
+		super(actorSystem, artifactServer);
 		this.zooKeeperUtilityFactory = Preconditions.checkNotNull(zooKeeperUtilityFactory);
 	}
 
@@ -64,7 +70,23 @@ public class ZooKeeperMesosServices implements MesosServices {
 
 	@Override
 	public void close(boolean cleanup) throws Exception {
-		// this also closes the underlying CuratorFramework instance
-		zooKeeperUtilityFactory.close(cleanup);
+		Throwable exception = null;
+
+		try {
+			// this also closes the underlying CuratorFramework instance
+			zooKeeperUtilityFactory.close(cleanup);
+		} catch (Throwable t) {
+			exception = ExceptionUtils.firstOrSuppressed(t, exception);
+		}
+
+		try {
+			super.close(cleanup);
+		} catch (Throwable t) {
+			exception = ExceptionUtils.firstOrSuppressed(t, exception);
+		}
+
+		if (exception != null) {
+			throw new FlinkException("Could not properly shut down the Mesos services.", exception);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 8bfb4d1..ff32486 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -251,6 +251,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 				containeredParams,
 				Collections.<Protos.Volume>emptyList(),
 				Collections.<ConstraintEvaluator>emptyList(),
+				"",
 				Option.<String>empty(),
 				Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index e81a2de..4bbcb25 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.mesos.runtime.clusterframework;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -32,7 +33,7 @@ import org.apache.flink.mesos.scheduler.messages.ReRegistered;
 import org.apache.flink.mesos.scheduler.messages.Registered;
 import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
 import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -159,17 +160,15 @@ public class MesosResourceManagerTest extends TestLogger {
 			FatalErrorHandler fatalErrorHandler,
 
 			// Mesos specifics
-			ActorSystem actorSystem,
 			Configuration flinkConfig,
+			MesosServices mesosServices,
 			MesosConfiguration mesosConfig,
-			MesosWorkerStore workerStore,
 			MesosTaskManagerParameters taskManagerParameters,
-			ContainerSpecification taskManagerContainerSpec,
-			MesosArtifactResolver artifactResolver) {
+			ContainerSpecification taskManagerContainerSpec) {
 			super(rpcService, resourceManagerEndpointId, resourceId, resourceManagerConfiguration,
 				highAvailabilityServices, heartbeatServices, slotManager, metricRegistry,
-				jobLeaderIdService, fatalErrorHandler, actorSystem, flinkConfig, mesosConfig, workerStore,
-				taskManagerParameters, taskManagerContainerSpec, artifactResolver);
+				jobLeaderIdService, fatalErrorHandler, flinkConfig, mesosServices, mesosConfig,
+				taskManagerParameters, taskManagerContainerSpec);
 		}
 
 		@Override
@@ -208,6 +207,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		TestingRpcService rpcService;
 		TestingFatalErrorHandler fatalErrorHandler;
 		MockMesosResourceManagerRuntimeServices rmServices;
+		MockMesosServices mesosServices;
 
 		// RM
 		ResourceManagerConfiguration rmConfiguration;
@@ -242,6 +242,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			rpcService = new TestingRpcService();
 			fatalErrorHandler = new TestingFatalErrorHandler();
 			rmServices = new MockMesosResourceManagerRuntimeServices();
+			mesosServices = new MockMesosServices();
 
 			// TaskExecutor templating
 			ContainerSpecification containerSpecification = new ContainerSpecification();
@@ -249,7 +250,7 @@ public class MesosResourceManagerTest extends TestLogger {
 				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
 			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
 				1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
-				Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), Option.<String>empty(),
+				Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
 				Option.<String>empty());
 
 			// resource manager
@@ -270,13 +271,11 @@ public class MesosResourceManagerTest extends TestLogger {
 					rmServices.jobLeaderIdService,
 					fatalErrorHandler,
 					// Mesos specifics
-					system,
 					flinkConfig,
+					mesosServices,
 					rmServices.mesosConfig,
-					rmServices.workerStore,
 					tmParams,
-					containerSpecification,
-					rmServices.artifactResolver
+					containerSpecification
 				);
 
 			// TaskExecutors
@@ -341,7 +340,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			public SchedulerDriver schedulerDriver;
 			public MesosConfiguration mesosConfig;
 			public MesosWorkerStore workerStore;
-			public MesosArtifactResolver artifactResolver;
+			public MesosArtifactServer artifactServer;
 
 			MockMesosResourceManagerRuntimeServices() throws Exception {
 				schedulerDriver = mock(SchedulerDriver.class);
@@ -354,7 +353,28 @@ public class MesosResourceManagerTest extends TestLogger {
 				workerStore = mock(MesosWorkerStore.class);
 				when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
 
-				artifactResolver = mock(MesosArtifactResolver.class);
+				artifactServer = mock(MesosArtifactServer.class);
+			}
+		}
+
+		class MockMesosServices implements MesosServices {
+			@Override
+			public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor) throws Exception {
+				return rmServices.workerStore;
+			}
+
+			@Override
+			public ActorSystem getLocalActorSystem() {
+				return system;
+			}
+
+			@Override
+			public MesosArtifactServer getArtifactServer() {
+				return rmServices.artifactServer;
+			}
+
+			@Override
+			public void close(boolean cleanup) throws Exception {
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
old mode 100644
new mode 100755
index 2538f20..1551933
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 
@@ -92,6 +94,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		LOG.info("Starting {}.", getClass().getSimpleName());
 
 		try {
+			installDefaultFileSystem(configuration);
+
 			SecurityContext securityContext = installSecurityContext(configuration);
 
 			securityContext.runSecured(new Callable<Void>() {
@@ -115,6 +119,17 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		}
 	}
 
+	protected void installDefaultFileSystem(Configuration configuration) throws Exception {
+		LOG.info("Install default filesystem.");
+
+		try {
+			FileSystem.setDefaultScheme(configuration);
+		} catch (IOException e) {
+			throw new IOException("Error while setting the default " +
+				"filesystem scheme from configuration.", e);
+		}
+	}
+
 	protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
 		LOG.info("Install security context.");
 
@@ -184,9 +199,18 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	}
 
 	protected void shutDown(boolean cleanupHaData) throws FlinkException {
+		LOG.info("Stopping {}.", getClass().getSimpleName());
+
 		Throwable exception = null;
 
 		synchronized (lock) {
+
+			try {
+				stopClusterComponents(cleanupHaData);
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+
 			if (metricRegistry != null) {
 				try {
 					metricRegistry.shutdown();
@@ -244,6 +268,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry) throws Exception;
 
+	protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
+	}
+
 	protected static ClusterConfiguration parseArguments(String[] args) {
 		ParameterTool parameterTool = ParameterTool.fromArgs(args);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index a7c6120..e70f6c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -110,7 +110,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 	}
 
 	@Override
-	protected void shutDown(boolean cleanupHaData) throws FlinkException {
+	protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
 		Throwable exception = null;
 
 		if (jobManagerRunner != null) {
@@ -129,14 +129,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			}
 		}
 
-		try {
-			super.shutDown(cleanupHaData);
-		} catch (Throwable t) {
-			exception = ExceptionUtils.firstOrSuppressed(t, exception);
-		}
-
 		if (exception != null) {
-			throw new FlinkException("Could not properly shut down the session cluster entry point.", exception);
+			throw new FlinkException("Could not properly shut down the job cluster entry point.", exception);
 		}
 	}