You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/19 15:25:52 UTC
[2/2] flink git commit: [FLINK-6630] [FLINK-6631] Implement FLIP-6
Mesos cluster entrypoints + MesosTaskExecutorRunner
[FLINK-6630] [FLINK-6631] Implement FLIP-6 Mesos cluster entrypoints + MesosTaskExecutorRunner
- bin: new entrypoints scripts for flip-6
- ClusterEntrypoint: Refactor the shutdown method
- ClusterEntrypoint: Install default FileSystem (for parity with legacy entrypoints)
- ClusterEntrypoint: new MesosJobClusterEntrypoint, MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner
- MesosServices: enhanced with artifactServer, localActorSystem
- MesosResourceManager: Fallback to old TM params when UNKNOWN resource profile is provided
- MesosResourceManager: config setting for taskmanager startup script (mesos.resourcemanager.tasks.taskmanager-cmd)
- test: added a 'noop' job graph for testing purposes
This closes #4555.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbac4a6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbac4a6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbac4a6c
Branch: refs/heads/master
Commit: bbac4a6c922199db08a5244d0fa1262a5f16d479
Parents: 76f1022
Author: Wright, Eron <er...@emc.com>
Authored: Wed Aug 16 14:30:24 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 19 17:25:21 2017 +0200
----------------------------------------------------------------------
.../mesos-bin/mesos-appmaster-flip6-job.sh | 47 +++++
.../mesos-bin/mesos-appmaster-flip6-session.sh | 47 +++++
.../mesos-bin/mesos-taskmanager-flip6.sh | 45 ++++
.../mesos/entrypoint/MesosEntrypointUtils.java | 177 ++++++++++++++++
.../entrypoint/MesosJobClusterEntrypoint.java | 204 +++++++++++++++++++
.../MesosSessionClusterEntrypoint.java | 178 ++++++++++++++++
.../entrypoint/MesosTaskExecutorRunner.java | 132 ++++++++++++
.../clusterframework/LaunchableMesosWorker.java | 30 ++-
.../MesosApplicationMasterRunner.java | 131 +-----------
.../clusterframework/MesosResourceManager.java | 47 +++--
.../MesosTaskManagerParameters.java | 20 +-
.../services/AbstractMesosServices.java | 73 +++++++
.../services/MesosServices.java | 17 ++
.../services/MesosServicesUtils.java | 28 ++-
.../services/StandaloneMesosServices.java | 10 +-
.../services/ZooKeeperMesosServices.java | 30 ++-
.../MesosFlinkResourceManagerTest.java | 1 +
.../MesosResourceManagerTest.java | 48 +++--
.../runtime/entrypoint/ClusterEntrypoint.java | 27 +++
.../entrypoint/JobClusterEntrypoint.java | 10 +-
.../entrypoint/SessionClusterEntrypoint.java | 8 +-
.../test/runtime/entrypoint/StreamingNoop.java | 60 ++++++
22 files changed, 1186 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
new file mode 100755
index 0000000..b21670a
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint "$@"
+
+rc=$?
+
+if [[ $rc -ne 0 ]]; then
+ echo "Error while starting the mesos application master. Please check ${log} for more details."
+fi
+
+exit $rc
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
new file mode 100755
index 0000000..b9e0f53
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint "$@"
+
+rc=$?
+
+if [[ $rc -ne 0 ]]; then
+ echo "Error while starting the mesos application master. Please check ${log} for more details."
+fi
+
+exit $rc
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
new file mode 100755
index 0000000..f251442
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
@@ -0,0 +1,45 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log=flink-taskmanager.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+# Add precomputed memory JVM options
+if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then
+ FLINK_ENV_JAVA_OPTS_MEM=""
+fi
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}"
+
+# Add TaskManager-specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner "$@"
+
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
new file mode 100755
index 0000000..0d81ead
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utils for Mesos entrpoints.
+ */
+public class MesosEntrypointUtils {
+
+ /**
+ * Loads the global configuration and adds the dynamic properties parsed from
+ * the given command line.
+ *
+ * @param cmd command line to parse for dynamic properties
+ * @return Global configuration with dynamic properties set
+ * @deprecated replace once FLINK-7269 has been merged
+ */
+ @Deprecated
+ public static Configuration loadConfiguration(CommandLine cmd) {
+
+ // merge the dynamic properties from the command-line
+ Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+ GlobalConfiguration.setDynamicProperties(dynamicProperties);
+ Configuration config = GlobalConfiguration.loadConfiguration();
+
+ return config;
+ }
+
+ /**
+ * Loads and validates the Mesos scheduler configuration.
+ * @param flinkConfig the global configuration.
+ * @param hostname the hostname to advertise to the Mesos master.
+ */
+ public static MesosConfiguration createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) {
+
+ Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder()
+ .setHostname(hostname);
+ Protos.Credential.Builder credential = null;
+
+ if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+ throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
+ }
+ String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL);
+
+ Duration failoverTimeout = FiniteDuration.apply(
+ flinkConfig.getInteger(
+ MesosOptions.FAILOVER_TIMEOUT_SECONDS),
+ TimeUnit.SECONDS);
+ frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
+
+ frameworkInfo.setName(flinkConfig.getString(
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
+
+ frameworkInfo.setRole(flinkConfig.getString(
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
+
+ frameworkInfo.setUser(flinkConfig.getString(
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
+
+ if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+ frameworkInfo.setPrincipal(flinkConfig.getString(
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
+
+ credential = Protos.Credential.newBuilder();
+ credential.setPrincipal(frameworkInfo.getPrincipal());
+
+ // some environments use a side-channel to communicate the secret to Mesos,
+ // and thus don't set the 'secret' configuration setting
+ if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+ credential.setSecret(flinkConfig.getString(
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
+ }
+ }
+
+ MesosConfiguration mesos =
+ new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential));
+
+ return mesos;
+ }
+
+ public static MesosTaskManagerParameters createTmParameters(Configuration configuration, Logger log) {
+ // TM configuration
+ final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(configuration);
+
+ log.info("TaskManagers will be created with {} task slots",
+ taskManagerParameters.containeredParameters().numSlots());
+ log.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " +
+ "JVM direct memory limit {} MB, {} cpus",
+ taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
+ taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
+ taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
+ taskManagerParameters.cpus());
+
+ return taskManagerParameters;
+ }
+
+ public static ContainerSpecification createContainerSpec(Configuration configuration, Configuration dynamicProperties)
+ throws Exception {
+ // generate a container spec which conveys the artifacts/vars needed to launch a TM
+ ContainerSpecification spec = new ContainerSpecification();
+
+ // propagate the AM dynamic configuration to the TM
+ spec.getDynamicConfiguration().addAll(dynamicProperties);
+
+ applyOverlays(configuration, spec);
+
+ return spec;
+ }
+
+ /**
+ * Generate a container specification as a TaskManager template.
+ *
+ * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
+ * needs (such as JAR file, config file, ...) and all environment variables into a container specification.
+ * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
+ * A lightweight HTTP server serves the artifacts to the fetcher.
+ */
+ public static void applyOverlays(
+ Configuration configuration, ContainerSpecification containerSpec) throws IOException {
+
+ // create the overlays that will produce the specification
+ CompositeContainerOverlay overlay = new CompositeContainerOverlay(
+ FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(),
+ HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(),
+ HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(),
+ KeytabOverlay.newBuilder().fromEnvironment(configuration).build(),
+ Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(),
+ SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build()
+ );
+
+ // apply the overlays
+ overlay.configure(containerSpec);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
new file mode 100755
index 0000000..890c4a7
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+ public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+ // ------------------------------------------------------------------------
+ // Command-line options
+ // ------------------------------------------------------------------------
+
+ private static final Options ALL_OPTIONS;
+
+ static {
+ ALL_OPTIONS =
+ new Options()
+ .addOption(BootstrapTools.newDynamicPropertiesOption());
+ }
+
+ private MesosConfiguration schedulerConfiguration;
+
+ private MesosServices mesosServices;
+
+ private MesosTaskManagerParameters taskManagerParameters;
+
+ private ContainerSpecification taskManagerContainerSpec;
+
+ public MesosJobClusterEntrypoint(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void initializeServices(Configuration config) throws Exception {
+ super.initializeServices(config);
+
+ final String hostname = config.getString(JobManagerOptions.ADDRESS);
+
+ // Mesos configuration
+ schedulerConfiguration = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+ // services
+ mesosServices = MesosServicesUtils.createMesosServices(config, hostname);
+
+ // TM configuration
+ taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
+ taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties());
+ }
+
+ @Override
+ protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
+ super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
+ }
+
+ @Override
+ protected ResourceManager<?> createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+ rmServicesConfiguration,
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor());
+
+ return new MesosResourceManager(
+ rpcService,
+ ResourceManager.RESOURCE_MANAGER_NAME,
+ resourceId,
+ rmConfiguration,
+ highAvailabilityServices,
+ heartbeatServices,
+ rmRuntimeServices.getSlotManager(),
+ metricRegistry,
+ rmRuntimeServices.getJobLeaderIdService(),
+ fatalErrorHandler,
+ configuration,
+ mesosServices,
+ schedulerConfiguration,
+ taskManagerParameters,
+ taskManagerContainerSpec
+ );
+ }
+
+ @Override
+ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+ String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
+ File fp = new File(jobGraphFile);
+
+ try (FileInputStream input = new FileInputStream(fp);
+ ObjectInputStream obInput = new ObjectInputStream(input)) {
+
+ return (JobGraph) obInput.readObject();
+ } catch (FileNotFoundException e) {
+ throw new FlinkException("Could not find the JobGraph file.", e);
+ } catch (ClassNotFoundException | IOException e) {
+ throw new FlinkException("Could not load the JobGraph from file.", e);
+ }
+ }
+
+ @Override
+ protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
+ Throwable exception = null;
+
+ try {
+ super.stopClusterComponents(cleanupHaData);
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
+ if (mesosServices != null) {
+ try {
+ mesosServices.close(cleanupHaData);
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly shut down the Mesos job cluster entry point.", exception);
+ }
+ }
+
+ public static void main(String[] args) {
+ // load configuration incl. dynamic properties
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd;
+ try {
+ cmd = parser.parse(ALL_OPTIONS, args);
+ }
+ catch (Exception e){
+ LOG.error("Could not parse the command-line options.", e);
+ System.exit(STARTUP_FAILURE_RETURN_CODE);
+ return;
+ }
+
+ Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+
+ MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration);
+
+ clusterEntrypoint.startCluster();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
new file mode 100755
index 0000000..67f5899
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+/**
+ * Entry point for Mesos session clusters.
+ */
+public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
+
+ // ------------------------------------------------------------------------
+ // Command-line options
+ // ------------------------------------------------------------------------
+
+ private static final Options ALL_OPTIONS;
+
+ static {
+ ALL_OPTIONS =
+ new Options()
+ .addOption(BootstrapTools.newDynamicPropertiesOption());
+ }
+
+ private MesosConfiguration mesosConfig;
+
+ private MesosServices mesosServices;
+
+ private MesosTaskManagerParameters taskManagerParameters;
+
+ private ContainerSpecification taskManagerContainerSpec;
+
+ public MesosSessionClusterEntrypoint(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void initializeServices(Configuration config) throws Exception {
+ super.initializeServices(config);
+
+ final String hostname = config.getString(JobManagerOptions.ADDRESS);
+
+ // Mesos configuration
+ mesosConfig = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+ // services
+ mesosServices = MesosServicesUtils.createMesosServices(config, hostname);
+
+ // TM configuration
+ taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
+ taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties());
+ }
+
+ @Override
+ protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
+ super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
+ }
+
+ @Override
+ protected ResourceManager<?> createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+ rmServicesConfiguration,
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor());
+
+ return new MesosResourceManager(
+ rpcService,
+ ResourceManager.RESOURCE_MANAGER_NAME,
+ resourceId,
+ rmConfiguration,
+ highAvailabilityServices,
+ heartbeatServices,
+ rmRuntimeServices.getSlotManager(),
+ metricRegistry,
+ rmRuntimeServices.getJobLeaderIdService(),
+ fatalErrorHandler,
+ configuration,
+ mesosServices,
+ mesosConfig,
+ taskManagerParameters,
+ taskManagerContainerSpec
+ );
+ }
+
+ @Override
+ protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
+ Throwable exception = null;
+
+ try {
+ super.stopClusterComponents(cleanupHaData);
+ } catch (Throwable t) {
+ exception = t;
+ }
+
+ if (mesosServices != null) {
+ try {
+ mesosServices.close(cleanupHaData);
+ } catch (Throwable t) {
+ exception = t;
+ }
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly shut down the Mesos session cluster entry point.", exception);
+ }
+ }
+
+ public static void main(String[] args) {
+ // load configuration incl. dynamic properties
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd;
+ try {
+ cmd = parser.parse(ALL_OPTIONS, args);
+ }
+ catch (Exception e){
+ LOG.error("Could not parse the command-line options.", e);
+ System.exit(STARTUP_FAILURE_RETURN_CODE);
+ return;
+ }
+
+ Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+
+ MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration);
+
+ clusterEntrypoint.startCluster();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
new file mode 100644
index 0000000..c4343d2
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskExecutorRunner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MesosTaskExecutorRunner.class);
+
+ private static final int INIT_ERROR_EXIT_CODE = 31;
+
+ private static final Options ALL_OPTIONS;
+
+ static {
+ ALL_OPTIONS =
+ new Options()
+ .addOption(BootstrapTools.newDynamicPropertiesOption());
+ }
+
+ public static void main(String[] args) throws Exception {
+ EnvironmentInformation.logEnvironmentInfo(LOG, MesosTaskExecutorRunner.class.getSimpleName(), args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ // try to parse the command line arguments
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
+ final Configuration configuration;
+ try {
+ configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
+ System.exit(INIT_ERROR_EXIT_CODE);
+ return;
+ }
+
+ // read the environment variables
+ final Map<String, String> envs = System.getenv();
+ final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
+
+ // configure local directory
+ String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+ if (flinkTempDirs != null) {
+ LOG.info("Overriding Mesos temporary file directories with those " +
+ "specified in the Flink config: {}", flinkTempDirs);
+ }
+ else if (tmpDirs != null) {
+ LOG.info("Setting directories for temporary files to: {}", tmpDirs);
+ configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
+ }
+
+ // configure the default filesystem
+ try {
+ FileSystem.setDefaultScheme(configuration);
+ } catch (IOException e) {
+ throw new IOException("Error while setting the default " +
+ "filesystem scheme from configuration.", e);
+ }
+
+ // tell akka to die in case of an error
+ configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+
+ // Infer the resource identifier from the environment variable
+ String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
+ final ResourceID resourceId = new ResourceID(containerID);
+ LOG.info("ResourceID assigned for this container: {}", resourceId);
+
+ // Run the TM in the security context
+ SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
+ SecurityUtils.install(sc);
+
+ try {
+ SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ TaskManagerRunner.runTaskManager(configuration, resourceId);
+
+ return 0;
+ }
+ });
+ }
+ catch (Throwable t) {
+ LOG.error("Error while starting the TaskManager", t);
+ System.exit(INIT_ERROR_EXIT_CODE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index ce7bb9d..2c32507 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.Utils;
import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -36,6 +37,7 @@ import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -261,12 +263,15 @@ public class LaunchableMesosWorker implements LaunchableTask {
env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, mesosConfiguration.frameworkInfo().getName()));
// build the launch command w/ dynamic application properties
- Option<String> bootstrapCmdOption = params.bootstrapCommand();
-
- final String bootstrapCommand = bootstrapCmdOption.isDefined() ? bootstrapCmdOption.get() + " && " : "";
- final String launchCommand = bootstrapCommand + "$FLINK_HOME/bin/mesos-taskmanager.sh " + ContainerSpecification.formatSystemProperties(dynamicProperties);
-
- cmd.setValue(launchCommand);
+ StringBuilder launchCommand = new StringBuilder();
+ if (params.bootstrapCommand().isDefined()) {
+ launchCommand.append(params.bootstrapCommand().get()).append(" && ");
+ }
+ launchCommand
+ .append(params.command())
+ .append(" ")
+ .append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+ cmd.setValue(launchCommand.toString());
// build the container info
Protos.ContainerInfo.Builder containerInfo = Protos.ContainerInfo.newBuilder();
@@ -312,4 +317,17 @@ public class LaunchableMesosWorker implements LaunchableTask {
"taskRequest=" + taskRequest +
'}';
}
+
+ /**
+ * Configures an artifact server to serve the artifacts associated with a container specification.
+ * @param server the server to configure.
+ * @param container the container with artifacts to serve.
+ * @throws IOException if the artifacts cannot be accessed.
+ */
+ static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
+ // serve the artifacts associated with the container environment
+ for (ContainerSpecification.Artifact artifact : container.getArtifacts()) {
+ server.addPath(artifact.source, artifact.dest);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
old mode 100644
new mode 100755
index 7891386..3d16a66
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -21,11 +21,10 @@ package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.mesos.entrypoint.MesosEntrypointUtils;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
@@ -34,13 +33,6 @@ import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
@@ -65,7 +57,6 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +64,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -81,7 +71,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import scala.Option;
-import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import static org.apache.flink.util.Preconditions.checkState;
@@ -222,7 +211,7 @@ public class MesosApplicationMasterRunner {
LOG.info("App Master Hostname to use: {}", appMasterHostname);
// Mesos configuration
- final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
+ final MesosConfiguration mesosConfig = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, appMasterHostname);
// JM configuration
int numberProcessors = Hardware.getNumberCPUCores();
@@ -235,19 +224,10 @@ public class MesosApplicationMasterRunner {
numberProcessors,
new ExecutorThreadFactory("mesos-jobmanager-io"));
- mesosServices = MesosServicesUtils.createMesosServices(config);
+ mesosServices = MesosServicesUtils.createMesosServices(config, appMasterHostname);
// TM configuration
- final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config);
-
- LOG.info("TaskManagers will be created with {} task slots",
- taskManagerParameters.containeredParameters().numSlots());
- LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " +
- "JVM direct memory limit {} MB, {} cpus",
- taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
- taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
- taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
- taskManagerParameters.cpus());
+ final MesosTaskManagerParameters taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
// JM endpoint, which should be explicitly configured based on acquired net resources
final int listeningPort = config.getInteger(JobManagerOptions.PORT);
@@ -268,9 +248,7 @@ public class MesosApplicationMasterRunner {
// try to start the artifact server
LOG.debug("Starting Artifact Server");
- final int artifactServerPort = config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT);
- final String artifactServerPrefix = UUID.randomUUID().toString();
- artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config);
+ artifactServer = mesosServices.getArtifactServer();
// ----------------- (3) Generate the configuration for the TaskManagers -------------------
@@ -287,10 +265,10 @@ public class MesosApplicationMasterRunner {
taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig);
// apply the overlays
- applyOverlays(config, taskManagerContainerSpec);
+ MesosEntrypointUtils.applyOverlays(config, taskManagerContainerSpec);
// configure the artifact server to serve the specified artifacts
- configureArtifactServer(artifactServer, taskManagerContainerSpec);
+ LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
// ----------------- (4) start the actors -------------------
@@ -386,14 +364,6 @@ public class MesosApplicationMasterRunner {
}
}
- if (artifactServer != null) {
- try {
- artifactServer.stop();
- } catch (Throwable ignored) {
- LOG.error("Failed to stop the artifact server", ignored);
- }
- }
-
if (actorSystem != null) {
try {
actorSystem.shutdown();
@@ -444,12 +414,6 @@ public class MesosApplicationMasterRunner {
}
}
- try {
- artifactServer.stop();
- } catch (Throwable t) {
- LOG.error("Failed to stop the artifact server", t);
- }
-
if (highAvailabilityServices != null) {
try {
highAvailabilityServices.close();
@@ -490,85 +454,4 @@ public class MesosApplicationMasterRunner {
return MemoryArchivist.class;
}
- /**
- * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration.
- */
- public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) {
-
- Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder()
- .setHostname(hostname);
- Protos.Credential.Builder credential = null;
-
- if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
- throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
- }
- String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL);
-
- Duration failoverTimeout = FiniteDuration.apply(
- flinkConfig.getInteger(
- MesosOptions.FAILOVER_TIMEOUT_SECONDS),
- TimeUnit.SECONDS);
- frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
-
- frameworkInfo.setName(flinkConfig.getString(
- MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
-
- frameworkInfo.setRole(flinkConfig.getString(
- MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
-
- frameworkInfo.setUser(flinkConfig.getString(
- MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
-
- if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
- frameworkInfo.setPrincipal(flinkConfig.getString(
- MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
-
- credential = Protos.Credential.newBuilder();
- credential.setPrincipal(frameworkInfo.getPrincipal());
-
- // some environments use a side-channel to communicate the secret to Mesos,
- // and thus don't set the 'secret' configuration setting
- if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
- credential.setSecret(flinkConfig.getString(
- MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
- }
- }
-
- MesosConfiguration mesos =
- new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential));
-
- return mesos;
- }
-
- /**
- * Generate a container specification as a TaskManager template.
- *
- * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
- * needs (such as JAR file, config file, ...) and all environment variables into a container specification.
- * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
- * A lightweight HTTP server serves the artifacts to the fetcher.
- */
- private static void applyOverlays(
- Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException {
-
- // create the overlays that will produce the specification
- CompositeContainerOverlay overlay = new CompositeContainerOverlay(
- FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
- HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
- HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
- KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
- Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
- SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build()
- );
-
- // apply the overlays
- overlay.configure(containerSpec);
- }
-
- private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
- // serve the artifacts associated with the container environment
- for (ContainerSpecification.Artifact artifact : container.getArtifacts()) {
- server.addPath(artifact.source, artifact.dest);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 736af59..445010b 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -38,7 +39,7 @@ import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.SlaveLost;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
@@ -75,6 +76,7 @@ import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -98,17 +100,20 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
/** The Mesos configuration (master and framework info). */
private final MesosConfiguration mesosConfig;
+ /** The Mesos services needed by the resource manager. */
+ private final MesosServices mesosServices;
+
/** The TaskManager container parameters (like container memory size). */
private final MesosTaskManagerParameters taskManagerParameters;
/** Container specification for launching a TM. */
private final ContainerSpecification taskManagerContainerSpec;
- /** Resolver for HTTP artifacts. */
- private final MesosArtifactResolver artifactResolver;
+ /** Server for HTTP artifacts. */
+ private final MesosArtifactServer artifactServer;
/** Persistent storage of allocated containers. */
- private final MesosWorkerStore workerStore;
+ private MesosWorkerStore workerStore;
/** A local actor system for using the helper actors. */
private final ActorSystem actorSystem;
@@ -145,13 +150,11 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler,
// Mesos specifics
- ActorSystem actorSystem,
Configuration flinkConfig,
+ MesosServices mesosServices,
MesosConfiguration mesosConfig,
- MesosWorkerStore workerStore,
MesosTaskManagerParameters taskManagerParameters,
- ContainerSpecification taskManagerContainerSpec,
- MesosArtifactResolver artifactResolver) {
+ ContainerSpecification taskManagerContainerSpec) {
super(
rpcService,
resourceManagerEndpointId,
@@ -164,13 +167,13 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
jobLeaderIdService,
fatalErrorHandler);
- this.actorSystem = Preconditions.checkNotNull(actorSystem);
+ this.mesosServices = Preconditions.checkNotNull(mesosServices);
+ this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
- this.workerStore = Preconditions.checkNotNull(workerStore);
- this.artifactResolver = Preconditions.checkNotNull(artifactResolver);
+ this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);
@@ -221,8 +224,9 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
*/
@Override
protected void initialize() throws ResourceManagerException {
- // start the worker store
+ // create and start the worker store
try {
+ this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig, getRpcService().getExecutor());
workerStore.start();
} catch (Exception e) {
throw new ResourceManagerException("Unable to initialize the worker store.", e);
@@ -266,6 +270,14 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
throw new ResourceManagerException("Unable to recover Mesos worker state.", e);
}
+ // configure the artifact server to serve the TM container artifacts
+ try {
+ LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+ }
+ catch (IOException e) {
+ throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+ }
+
// begin scheduling
connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
schedulerDriver.start();
@@ -627,20 +639,23 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
taskManagerParameters.containerType(),
taskManagerParameters.containerImageName(),
new ContaineredTaskManagerParameters(
- resourceProfile.getMemoryInMB() < 0 ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(),
- resourceProfile.getHeapMemoryInMB(),
- resourceProfile.getDirectMemoryInMB(),
+ ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(),
+ ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : resourceProfile.getHeapMemoryInMB(),
+ ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() : resourceProfile.getDirectMemoryInMB(),
1,
new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
taskManagerParameters.containerVolumes(),
taskManagerParameters.constraints(),
+ taskManagerParameters.command(),
taskManagerParameters.bootstrapCommand(),
taskManagerParameters.getTaskManagerHostname()
);
+ LOG.debug("LaunchableMesosWorker parameters: {}", params);
+
LaunchableMesosWorker launchable =
new LaunchableMesosWorker(
- artifactResolver,
+ artifactServer,
params,
taskManagerContainerSpec,
taskID,
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index f5a415e..3859913 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -74,6 +74,10 @@ public class MesosTaskManagerParameters {
key("mesos.resourcemanager.tasks.hostname")
.noDefaultValue();
+ public static final ConfigOption<String> MESOS_TM_CMD =
+ key("mesos.resourcemanager.tasks.taskmanager-cmd")
+ .defaultValue("$FLINK_HOME/bin/mesos-taskmanager.sh"); // internal
+
public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
key("mesos.resourcemanager.tasks.bootstrap-cmd")
.noDefaultValue();
@@ -107,6 +111,8 @@ public class MesosTaskManagerParameters {
private final List<ConstraintEvaluator> constraints;
+ private final String command;
+
private final Option<String> bootstrapCommand;
private final Option<String> taskManagerHostname;
@@ -118,6 +124,7 @@ public class MesosTaskManagerParameters {
ContaineredTaskManagerParameters containeredParameters,
List<Protos.Volume> containerVolumes,
List<ConstraintEvaluator> constraints,
+ String command,
Option<String> bootstrapCommand,
Option<String> taskManagerHostname) {
@@ -127,6 +134,7 @@ public class MesosTaskManagerParameters {
this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
this.containerVolumes = Preconditions.checkNotNull(containerVolumes);
this.constraints = Preconditions.checkNotNull(constraints);
+ this.command = Preconditions.checkNotNull(command);
this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
}
@@ -183,6 +191,13 @@ public class MesosTaskManagerParameters {
}
/**
+ * Get the command.
+ */
+ public String command() {
+ return command;
+ }
+
+ /**
* Get the bootstrap command.
*/
public Option<String> bootstrapCommand() {
@@ -199,6 +214,7 @@ public class MesosTaskManagerParameters {
", containerVolumes=" + containerVolumes +
", constraints=" + constraints +
", taskManagerHostName=" + taskManagerHostname +
+ ", command=" + command +
", bootstrapCommand=" + bootstrapCommand +
'}';
}
@@ -249,7 +265,8 @@ public class MesosTaskManagerParameters {
//obtain Task Manager Host Name from the configuration
Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
- //obtain bootstrap command from the configuration
+ //obtain command-line from the configuration
+ String tmCommand = flinkConfig.getString(MESOS_TM_CMD);
Option<String> tmBootstrapCommand = Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));
return new MesosTaskManagerParameters(
@@ -259,6 +276,7 @@ public class MesosTaskManagerParameters {
containeredParameters,
containerVolumes,
constraints,
+ tmCommand,
tmBootstrapCommand,
taskManagerHostname);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
new file mode 100644
index 0000000..e4f4cf7
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import akka.actor.ActorSystem;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abrstact implementation of {@link MesosServices}.
+ */
+public abstract class AbstractMesosServices implements MesosServices {
+
+ private final ActorSystem actorSystem;
+
+ private final MesosArtifactServer artifactServer;
+
+ protected AbstractMesosServices(ActorSystem actorSystem, MesosArtifactServer artifactServer) {
+ this.actorSystem = checkNotNull(actorSystem);
+ this.artifactServer = checkNotNull(artifactServer);
+ }
+
+ @Override
+ public ActorSystem getLocalActorSystem() {
+ return actorSystem;
+ }
+
+ @Override
+ public MesosArtifactServer getArtifactServer() {
+ return artifactServer;
+ }
+
+ @Override
+ public void close(boolean cleanup) throws Exception {
+ Throwable exception = null;
+
+ try {
+ actorSystem.shutdown();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
+ try {
+ artifactServer.stop();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly shut down the Mesos services.", exception);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
index 5655bfc..6a64f4f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
@@ -20,6 +20,9 @@ package org.apache.flink.mesos.runtime.clusterframework.services;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+
+import akka.actor.ActorSystem;
import java.util.concurrent.Executor;
@@ -42,6 +45,20 @@ public interface MesosServices {
Executor executor) throws Exception;
/**
+ * Gets a local {@link ActorSystem} which is used for child actors within
+ * {@link org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager}.
+ *
+ * @return a reference to an actor system.
+ */
+ ActorSystem getLocalActorSystem();
+
+ /**
+ * Gets the artifact server with which to serve essential resources to task managers.
+ * @return a reference to an artifact server.
+ */
+ MesosArtifactServer getArtifactServer();
+
+ /**
* Closes all state maintained by the mesos services implementation.
*
* @param cleanup is true if a cleanup shall be performed
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
index 370a760..c5a8516 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -20,9 +20,16 @@ package org.apache.flink.mesos.runtime.clusterframework.services;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
+import akka.actor.ActorSystem;
+
+import java.util.UUID;
+
/**
* Utilities for the {@link MesosServices}.
*/
@@ -32,15 +39,21 @@ public class MesosServicesUtils {
* Creates a {@link MesosServices} instance depending on the high availability settings.
*
* @param configuration containing the high availability settings
+ * @param hostname the hostname to advertise to remote clients
* @return a mesos services instance
* @throws Exception if the mesos services instance could not be created
*/
- public static MesosServices createMesosServices(Configuration configuration) throws Exception {
+ public static MesosServices createMesosServices(Configuration configuration, String hostname) throws Exception {
+
+ ActorSystem localActorSystem = AkkaUtils.createLocalActorSystem(configuration);
+
+ MesosArtifactServer artifactServer = createArtifactServer(configuration, hostname);
+
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
switch (highAvailabilityMode) {
case NONE:
- return new StandaloneMesosServices();
+ return new StandaloneMesosServices(localActorSystem, artifactServer);
case ZOOKEEPER:
final String zkMesosRootPath = configuration.getString(
@@ -50,10 +63,19 @@ public class MesosServicesUtils {
configuration,
zkMesosRootPath);
- return new ZooKeeperMesosServices(zooKeeperUtilityFactory);
+ return new ZooKeeperMesosServices(localActorSystem, artifactServer, zooKeeperUtilityFactory);
default:
throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
}
}
+
+ private static MesosArtifactServer createArtifactServer(Configuration configuration, String hostname) throws Exception {
+ final int artifactServerPort = configuration.getInteger(MesosOptions.ARTIFACT_SERVER_PORT, 0);
+
+ // a random prefix is affixed to artifact URLs to ensure uniqueness in the Mesos fetcher cache
+ final String artifactServerPrefix = UUID.randomUUID().toString();
+
+ return new MesosArtifactServer(artifactServerPrefix, hostname, artifactServerPort, configuration);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
index aa3157f..b93fd29 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
@@ -21,13 +21,20 @@ package org.apache.flink.mesos.runtime.clusterframework.services;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+
+import akka.actor.ActorSystem;
import java.util.concurrent.Executor;
/**
* {@link MesosServices} implementation for the standalone mode.
*/
-public class StandaloneMesosServices implements MesosServices {
+public class StandaloneMesosServices extends AbstractMesosServices {
+
+ protected StandaloneMesosServices(ActorSystem actorSystem, MesosArtifactServer artifactServer) {
+ super(actorSystem, artifactServer);
+ }
@Override
public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor) {
@@ -36,5 +43,6 @@ public class StandaloneMesosServices implements MesosServices {
@Override
public void close(boolean cleanup) throws Exception {
+ super.close(cleanup);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 2883e4f..069cb83 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -21,25 +21,31 @@ package org.apache.flink.mesos.runtime.clusterframework.services;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import akka.actor.ActorSystem;
+
import java.util.concurrent.Executor;
/**
* {@link MesosServices} implementation for the ZooKeeper high availability based mode.
*/
-public class ZooKeeperMesosServices implements MesosServices {
+public class ZooKeeperMesosServices extends AbstractMesosServices {
// Factory to create ZooKeeper utility classes
private final ZooKeeperUtilityFactory zooKeeperUtilityFactory;
- public ZooKeeperMesosServices(ZooKeeperUtilityFactory zooKeeperUtilityFactory) {
+ public ZooKeeperMesosServices(ActorSystem actorSystem, MesosArtifactServer artifactServer, ZooKeeperUtilityFactory zooKeeperUtilityFactory) {
+ super(actorSystem, artifactServer);
this.zooKeeperUtilityFactory = Preconditions.checkNotNull(zooKeeperUtilityFactory);
}
@@ -64,7 +70,23 @@ public class ZooKeeperMesosServices implements MesosServices {
@Override
public void close(boolean cleanup) throws Exception {
- // this also closes the underlying CuratorFramework instance
- zooKeeperUtilityFactory.close(cleanup);
+ Throwable exception = null;
+
+ try {
+ // this also closes the underlying CuratorFramework instance
+ zooKeeperUtilityFactory.close(cleanup);
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
+ try {
+ super.close(cleanup);
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly shut down the Mesos services.", exception);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 8bfb4d1..ff32486 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -251,6 +251,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
containeredParams,
Collections.<Protos.Volume>emptyList(),
Collections.<ConstraintEvaluator>emptyList(),
+ "",
Option.<String>empty(),
Option.<String>empty());
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index e81a2de..4bbcb25 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -32,7 +33,7 @@ import org.apache.flink.mesos.scheduler.messages.ReRegistered;
import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -159,17 +160,15 @@ public class MesosResourceManagerTest extends TestLogger {
FatalErrorHandler fatalErrorHandler,
// Mesos specifics
- ActorSystem actorSystem,
Configuration flinkConfig,
+ MesosServices mesosServices,
MesosConfiguration mesosConfig,
- MesosWorkerStore workerStore,
MesosTaskManagerParameters taskManagerParameters,
- ContainerSpecification taskManagerContainerSpec,
- MesosArtifactResolver artifactResolver) {
+ ContainerSpecification taskManagerContainerSpec) {
super(rpcService, resourceManagerEndpointId, resourceId, resourceManagerConfiguration,
highAvailabilityServices, heartbeatServices, slotManager, metricRegistry,
- jobLeaderIdService, fatalErrorHandler, actorSystem, flinkConfig, mesosConfig, workerStore,
- taskManagerParameters, taskManagerContainerSpec, artifactResolver);
+ jobLeaderIdService, fatalErrorHandler, flinkConfig, mesosServices, mesosConfig,
+ taskManagerParameters, taskManagerContainerSpec);
}
@Override
@@ -208,6 +207,7 @@ public class MesosResourceManagerTest extends TestLogger {
TestingRpcService rpcService;
TestingFatalErrorHandler fatalErrorHandler;
MockMesosResourceManagerRuntimeServices rmServices;
+ MockMesosServices mesosServices;
// RM
ResourceManagerConfiguration rmConfiguration;
@@ -242,6 +242,7 @@ public class MesosResourceManagerTest extends TestLogger {
rpcService = new TestingRpcService();
fatalErrorHandler = new TestingFatalErrorHandler();
rmServices = new MockMesosResourceManagerRuntimeServices();
+ mesosServices = new MockMesosServices();
// TaskExecutor templating
ContainerSpecification containerSpecification = new ContainerSpecification();
@@ -249,7 +250,7 @@ public class MesosResourceManagerTest extends TestLogger {
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
- Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), Option.<String>empty(),
+ Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
Option.<String>empty());
// resource manager
@@ -270,13 +271,11 @@ public class MesosResourceManagerTest extends TestLogger {
rmServices.jobLeaderIdService,
fatalErrorHandler,
// Mesos specifics
- system,
flinkConfig,
+ mesosServices,
rmServices.mesosConfig,
- rmServices.workerStore,
tmParams,
- containerSpecification,
- rmServices.artifactResolver
+ containerSpecification
);
// TaskExecutors
@@ -341,7 +340,7 @@ public class MesosResourceManagerTest extends TestLogger {
public SchedulerDriver schedulerDriver;
public MesosConfiguration mesosConfig;
public MesosWorkerStore workerStore;
- public MesosArtifactResolver artifactResolver;
+ public MesosArtifactServer artifactServer;
MockMesosResourceManagerRuntimeServices() throws Exception {
schedulerDriver = mock(SchedulerDriver.class);
@@ -354,7 +353,28 @@ public class MesosResourceManagerTest extends TestLogger {
workerStore = mock(MesosWorkerStore.class);
when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
- artifactResolver = mock(MesosArtifactResolver.class);
+ artifactServer = mock(MesosArtifactServer.class);
+ }
+ }
+
+ class MockMesosServices implements MesosServices {
+ @Override
+ public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor) throws Exception {
+ return rmServices.workerStore;
+ }
+
+ @Override
+ public ActorSystem getLocalActorSystem() {
+ return system;
+ }
+
+ @Override
+ public MesosArtifactServer getArtifactServer() {
+ return rmServices.artifactServer;
+ }
+
+ @Override
+ public void close(boolean cleanup) throws Exception {
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
old mode 100644
new mode 100755
index 2538f20..1551933
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
@@ -92,6 +94,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
+ installDefaultFileSystem(configuration);
+
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured(new Callable<Void>() {
@@ -115,6 +119,17 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
}
}
+ protected void installDefaultFileSystem(Configuration configuration) throws Exception {
+ LOG.info("Install default filesystem.");
+
+ try {
+ FileSystem.setDefaultScheme(configuration);
+ } catch (IOException e) {
+ throw new IOException("Error while setting the default " +
+ "filesystem scheme from configuration.", e);
+ }
+ }
+
protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
LOG.info("Install security context.");
@@ -184,9 +199,18 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
}
protected void shutDown(boolean cleanupHaData) throws FlinkException {
+ LOG.info("Stopping {}.", getClass().getSimpleName());
+
Throwable exception = null;
synchronized (lock) {
+
+ try {
+ stopClusterComponents(cleanupHaData);
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
if (metricRegistry != null) {
try {
metricRegistry.shutdown();
@@ -244,6 +268,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry) throws Exception;
+ protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
+ }
+
protected static ClusterConfiguration parseArguments(String[] args) {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index a7c6120..e70f6c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -110,7 +110,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
}
@Override
- protected void shutDown(boolean cleanupHaData) throws FlinkException {
+ protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
Throwable exception = null;
if (jobManagerRunner != null) {
@@ -129,14 +129,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
}
}
- try {
- super.shutDown(cleanupHaData);
- } catch (Throwable t) {
- exception = ExceptionUtils.firstOrSuppressed(t, exception);
- }
-
if (exception != null) {
- throw new FlinkException("Could not properly shut down the session cluster entry point.", exception);
+ throw new FlinkException("Could not properly shut down the job cluster entry point.", exception);
}
}