You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/31 12:56:18 UTC
[flink] branch master updated: [FLINK-11384][mesos] Remove legacy
MesosTaskManager
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 119128f [FLINK-11384][mesos] Remove legacy MesosTaskManager
119128f is described below
commit 119128fb1d05ab5b9539a00fe1c381e8af4a7304
Author: ZILI CHEN <wa...@gmail.com>
AuthorDate: Thu Jan 31 20:56:10 2019 +0800
[FLINK-11384][mesos] Remove legacy MesosTaskManager
---
.../clusterframework/MesosTaskManagerRunner.java | 121 ---------------------
.../clusterframework/MesosTaskManager.scala | 71 ------------
2 files changed, 192 deletions(-)
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
deleted file mode 100644
index 74201d9..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.runtime.clusterframework;
-
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.mesos.entrypoint.MesosEntrypointUtils;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.security.SecurityConfiguration;
-import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-/**
- * The entry point for running a TaskManager in a Mesos container.
- */
-public class MesosTaskManagerRunner {
-
- private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class);
-
- private static final Options ALL_OPTIONS;
-
- static {
- ALL_OPTIONS =
- new Options()
- .addOption(BootstrapTools.newDynamicPropertiesOption());
- }
-
- /** The process environment variables. */
- private static final Map<String, String> ENV = System.getenv();
-
- public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws Exception {
- EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args);
- SignalHandler.register(LOG);
- JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
- // try to parse the command line arguments
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = parser.parse(ALL_OPTIONS, args);
-
- final Configuration configuration;
- try {
- final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
- LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
-
- configuration = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);
- }
- catch (Throwable t) {
- LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
- System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
- return;
- }
-
- final Map<String, String> envs = System.getenv();
-
- // configure the filesystems
- try {
- FileSystem.initialize(configuration);
- } catch (IOException e) {
- throw new IOException("Error while confoguring the filesystems.", e);
- }
-
- // tell akka to die in case of an error
- configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
-
- // Infer the resource identifier from the environment variable
- String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
- final ResourceID resourceId = new ResourceID(containerID);
- LOG.info("ResourceID assigned for this container: {}", resourceId);
-
- // Run the TM in the security context
- SecurityConfiguration sc = new SecurityConfiguration(configuration);
- SecurityUtils.install(sc);
-
- try {
- SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
- return 0;
- }
- });
- }
- catch (Throwable t) {
- LOG.error("Error while starting the TaskManager", t);
- System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
- }
- }
-}
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
deleted file mode 100644
index 12cc375..0000000
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.runtime.clusterframework
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
-import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
-import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
-
-/** An extension of the TaskManager that listens for additional Mesos-related
- * messages.
- */
-class MesosTaskManager(
- config: TaskManagerConfiguration,
- resourceID: ResourceID,
- taskManagerLocation: TaskManagerLocation,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
- numberOfSlots: Int,
- highAvailabilityServices: HighAvailabilityServices,
- taskManagerMetricGroup : TaskManagerMetricGroup)
- extends TaskManager(
- config,
- resourceID,
- taskManagerLocation,
- memoryManager,
- ioManager,
- network,
- taskManagerLocalStateStoresManager,
- numberOfSlots,
- highAvailabilityServices,
- taskManagerMetricGroup) {
-
- override def handleMessage: Receive = {
- super.handleMessage
- }
-}
-
-object MesosTaskManager {
- /** Entry point (main method) to run the TaskManager on Mesos.
- *
- * @param args The command line arguments.
- */
- def main(args: Array[String]): Unit = {
- MesosTaskManagerRunner.runTaskManager(args, classOf[MesosTaskManager])
- }
-
-}