You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/05 11:30:36 UTC
flink git commit: [FLINK-7074] [tm] Add entry point for the
TaskManagerRunner
Repository: flink
Updated Branches:
refs/heads/master 1ba1260a8 -> 40dce2909
[FLINK-7074] [tm] Add entry point for the TaskManagerRunner
The entry point can be used by the standalone mode to run a TaskManager. Moreover, the
YarnTaskExecutorRunner now reuses some of the start up logic of the TaskManagerRunner.
This closes #4252.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40dce290
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40dce290
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40dce290
Branch: refs/heads/master
Commit: 40dce2909e9a5c3d915bd8c27c3e59bc102e9a15
Parents: 1ba1260
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 4 15:01:18 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 5 13:28:58 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 53 ++++-
.../runtime/taskexecutor/TaskManagerRunner.java | 237 ++++++++++++++-----
.../apache/flink/runtime/akka/AkkaUtils.scala | 6 +
.../flink/yarn/YarnTaskExecutorRunner.java | 135 ++---------
4 files changed, 235 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40dce290/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3e7f2f3..6d3a3c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -39,8 +39,10 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.ExceptionUtils;
@@ -90,7 +92,7 @@ public class MiniCluster {
private ResourceManagerRunner[] resourceManagerRunners;
@GuardedBy("lock")
- private TaskManagerRunner[] taskManagerRunners;
+ private TaskExecutor[] taskManagers;
@GuardedBy("lock")
private MiniClusterJobDispatcher jobDispatcher;
@@ -253,7 +255,7 @@ public class MiniCluster {
// bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
- taskManagerRunners = startTaskManagers(
+ taskManagers = startTaskManagers(
configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
// bring up the dispatcher that launches JobManagers when jobs submitted
@@ -338,17 +340,17 @@ public class MiniCluster {
resourceManagerRunners = null;
}
- if (taskManagerRunners != null) {
- for (TaskManagerRunner tm : taskManagerRunners) {
+ if (taskManagers != null) {
+ for (TaskExecutor tm : taskManagers) {
if (tm != null) {
try {
- tm.shutDown(null);
+ tm.shutDown();
} catch (Throwable t) {
exception = firstOrSuppressed(t, exception);
}
}
}
- taskManagerRunners = null;
+ taskManagers = null;
}
// shut down the RpcServices
@@ -402,7 +404,7 @@ public class MiniCluster {
final ResourceManagerGateway resourceManager =
commonRpcService.connect(addressAndId.leaderAddress(), ResourceManagerGateway.class).get();
- final int numTaskManagersToWaitFor = taskManagerRunners.length;
+ final int numTaskManagersToWaitFor = taskManagers.length;
// poll and wait until enough TaskManagers are available
while (true) {
@@ -540,30 +542,31 @@ public class MiniCluster {
return resourceManagerRunners;
}
- protected TaskManagerRunner[] startTaskManagers(
+ protected TaskExecutor[] startTaskManagers(
Configuration configuration,
HighAvailabilityServices haServices,
MetricRegistry metricRegistry,
int numTaskManagers,
RpcService[] taskManagerRpcServices) throws Exception {
- final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers];
+ final TaskExecutor[] taskExecutors = new TaskExecutor[numTaskManagers];
final boolean localCommunication = numTaskManagers == 1;
for (int i = 0; i < numTaskManagers; i++) {
- taskManagerRunners[i] = new TaskManagerRunner(
+ taskExecutors[i] = TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServices[i],
haServices,
heartbeatServices,
metricRegistry,
- localCommunication);
+ localCommunication,
+ new TerminatingFatalErrorHandler(i));
- taskManagerRunners[i].start();
+ taskExecutors[i].start();
}
- return taskManagerRunners;
+ return taskExecutors;
}
// ------------------------------------------------------------------------
@@ -614,4 +617,28 @@ public class MiniCluster {
return config;
}
+
+ private class TerminatingFatalErrorHandler implements FatalErrorHandler {
+
+ private final int index;
+
+ private TerminatingFatalErrorHandler(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ LOG.error("TaskManager #{} failed.", index, exception);
+
+ try {
+ synchronized (lock) {
+ if (taskManagers[index] != null) {
+ taskManagers[index].shutDown();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("TaskManager #{} could not be properly terminated.", index, e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40dce290/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 2ed1578..78b49ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -19,27 +19,41 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
-import java.util.concurrent.Executor;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -52,92 +66,58 @@ public class TaskManagerRunner implements FatalErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
+ private static final int STARTUP_FAILURE_RETURN_CODE = 1;
+
+ private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
private final Object lock = new Object();
private final Configuration configuration;
private final ResourceID resourceID;
+ private final Time timeout;
+
private final RpcService rpcService;
private final HighAvailabilityServices highAvailabilityServices;
+ private final MetricRegistry metricRegistry;
+
/** Executor used to run future callbacks */
- private final Executor executor;
+ private final ExecutorService executor;
private final TaskExecutor taskManager;
- public TaskManagerRunner(
- Configuration configuration,
- ResourceID resourceID,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry) throws Exception {
-
- this(
- configuration,
- resourceID,
- rpcService,
- highAvailabilityServices,
- heartbeatServices,
- metricRegistry,
- false);
- }
-
- public TaskManagerRunner(
- Configuration configuration,
- ResourceID resourceID,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry,
- boolean localCommunicationOnly) throws Exception {
-
+ public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
this.configuration = Preconditions.checkNotNull(configuration);
- this.resourceID = Preconditions.checkNotNull(resourceID);
- this.rpcService = Preconditions.checkNotNull(rpcService);
- this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
- this.executor = rpcService.getExecutor();
+ this.resourceID = Preconditions.checkNotNull(resourceId);
- InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
+ timeout = AkkaUtils.getTimeoutAsTime(configuration);
- TaskManagerServicesConfiguration taskManagerServicesConfiguration =
- TaskManagerServicesConfiguration.fromConfiguration(
- configuration,
- remoteAddress,
- localCommunicationOnly);
+ this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
+ Hardware.getNumberCPUCores(),
+ new ExecutorThreadFactory("taskmanager-future"));
- TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
- taskManagerServicesConfiguration,
- resourceID);
+ highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ configuration,
+ executor,
+ HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
- TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+ rpcService = createRpcService(configuration, highAvailabilityServices);
- TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
- metricRegistry,
- taskManagerServices.getTaskManagerLocation().getHostname(),
- resourceID.toString());
+ HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
- // Initialize the TM metrics
- TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
+ metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
- this.taskManager = new TaskExecutor(
+ taskManager = startTaskManager(
+ configuration,
+ resourceId,
rpcService,
- taskManagerConfiguration,
- taskManagerServices.getTaskManagerLocation(),
- taskManagerServices.getMemoryManager(),
- taskManagerServices.getIOManager(),
- taskManagerServices.getNetworkEnvironment(),
highAvailabilityServices,
heartbeatServices,
metricRegistry,
- taskManagerMetricGroup,
- taskManagerServices.getBroadcastVariableManager(),
- taskManagerServices.getFileCache(),
- taskManagerServices.getTaskSlotTable(),
- taskManagerServices.getJobManagerTable(),
- taskManagerServices.getJobLeaderService(),
+ false,
this);
}
@@ -149,16 +129,34 @@ public class TaskManagerRunner implements FatalErrorHandler {
taskManager.start();
}
- public void shutDown(Throwable cause) {
+ public void shutDown() throws Exception {
shutDownInternally();
}
- protected void shutDownInternally() {
+ protected void shutDownInternally() throws Exception {
+ Exception exception = null;
+
synchronized(lock) {
try {
taskManager.shutDown();
} catch (Exception e) {
- LOG.error("Could not properly shut down the task manager.", e);
+ exception = e;
+ }
+
+ metricRegistry.shutdown();
+
+ rpcService.stopService();
+
+ try {
+ highAvailabilityServices.close();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ Executors.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
+
+ if (exception != null) {
+ throw exception;
}
}
}
@@ -175,13 +173,122 @@ public class TaskManagerRunner implements FatalErrorHandler {
@Override
public void onFatalError(Throwable exception) {
LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception);
- shutDown(exception);
+
+ try {
+ shutDown();
+ } catch (Throwable t) {
+ LOG.error("Could not properly shut down TaskManager.", t);
+ }
+
+ System.exit(RUNTIME_FAILURE_RETURN_CODE);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Static entry point
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+ // startup checks and logging
+ EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
+
+ if (maxOpenFileHandles != -1L) {
+ LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
+ } else {
+ LOG.info("Cannot determine the maximum number of open file descriptors");
+ }
+
+ ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ final String configDir = parameterTool.get("configDir");
+
+ final Configuration configuration = GlobalConfiguration.loadConfiguration(configDir);
+
+ SecurityUtils.install(new SecurityUtils.SecurityConfiguration(configuration));
+
+ try {
+ SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ runTaskManager(configuration, ResourceID.generate());
+ return null;
+ }
+ });
+ } catch (Throwable t) {
+ LOG.error("TaskManager initialization failed.", t);
+ System.exit(STARTUP_FAILURE_RETURN_CODE);
+ }
+ }
+
+ public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
+ final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);
+
+ taskManagerRunner.start();
}
// --------------------------------------------------------------------------------------------
// Static utilities
// --------------------------------------------------------------------------------------------
+ public static TaskExecutor startTaskManager(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ boolean localCommunicationOnly,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(resourceID);
+ Preconditions.checkNotNull(rpcService);
+ Preconditions.checkNotNull(highAvailabilityServices);
+
+ InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
+
+ TaskManagerServicesConfiguration taskManagerServicesConfiguration =
+ TaskManagerServicesConfiguration.fromConfiguration(
+ configuration,
+ remoteAddress,
+ localCommunicationOnly);
+
+ TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
+ taskManagerServicesConfiguration,
+ resourceID);
+
+ TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+
+ TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
+ metricRegistry,
+ taskManagerServices.getTaskManagerLocation().getHostname(),
+ resourceID.toString());
+
+ // Initialize the TM metrics
+ TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
+
+ return new TaskExecutor(
+ rpcService,
+ taskManagerConfiguration,
+ taskManagerServices.getTaskManagerLocation(),
+ taskManagerServices.getMemoryManager(),
+ taskManagerServices.getIOManager(),
+ taskManagerServices.getNetworkEnvironment(),
+ highAvailabilityServices,
+ heartbeatServices,
+ metricRegistry,
+ taskManagerMetricGroup,
+ taskManagerServices.getBroadcastVariableManager(),
+ taskManagerServices.getFileCache(),
+ taskManagerServices.getTaskSlotTable(),
+ taskManagerServices.getJobManagerTable(),
+ taskManagerServices.getJobLeaderService(),
+ fatalErrorHandler);
+ }
+
/**
* Create a RPC service for the task manager.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/40dce290/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index b74a9a3..408cc93 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -567,6 +567,12 @@ object AkkaUtils {
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
+ def getTimeoutAsTime(config: Configuration): Time = {
+ val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT))
+
+ Time.milliseconds(duration.toMillis)
+ }
+
def getDefaultTimeout: Time = {
val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue())
http://git-wip-us.apache.org/repos/asf/flink/blob/40dce290/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 2ed4c1d..e0dc55d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -25,12 +25,6 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -61,14 +55,6 @@ public class YarnTaskExecutorRunner {
/** The exit code returned if the initialization of the yarn task executor runner failed. */
private static final int INIT_ERROR_EXIT_CODE = 31;
- private MetricRegistry metricRegistry;
-
- private HighAvailabilityServices haServices;
-
- private RpcService taskExecutorRpcService;
-
- private TaskManagerRunner taskManagerRunner;
-
// ------------------------------------------------------------------------
// Program entry point
// ------------------------------------------------------------------------
@@ -83,20 +69,17 @@ public class YarnTaskExecutorRunner {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
- // run and exit with the proper return code
- int returnCode = new YarnTaskExecutorRunner().run(args);
- System.exit(returnCode);
+ run(args);
}
/**
- * The instance entry point for the YARN task executor. Obtains user group
- * information and calls the main work method {@link #runTaskExecutor(org.apache.flink.configuration.Configuration)} as a
+ * The instance entry point for the YARN task executor. Obtains user group information and calls
+ * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a
* privileged action.
*
* @param args The command line arguments.
- * @return The process exit code.
*/
- protected int run(String[] args) {
+ private static void run(String[] args) {
try {
LOG.debug("All environment variables: {}", ENV);
@@ -166,114 +149,30 @@ public class YarnTaskExecutorRunner {
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
- SecurityUtils.install(sc);
-
- return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return runTaskExecutor(configuration);
- }
- });
-
- }
- catch (Throwable t) {
- // make sure that everything whatever ends up in the log
- LOG.error("YARN Application Master initialization failed", t);
- return INIT_ERROR_EXIT_CODE;
- }
- }
-
- // ------------------------------------------------------------------------
- // Core work method
- // ------------------------------------------------------------------------
-
- /**
- * The main work method, must run as a privileged action.
- *
- * @return The return code for the Java process.
- */
- protected int runTaskExecutor(Configuration config) {
-
- try {
- // ---- (1) create common services
- // first get the ResouceId, resource id is the container id for yarn.
final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
Preconditions.checkArgument(containerId != null,
- "ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
+ "ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
+
// use the hostname passed by job manager
final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);
if (taskExecutorHostname != null) {
- config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname);
+ configuration.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname);
}
- ResourceID resourceID = new ResourceID(containerId);
- LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString());
-
- taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices);
-
- haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
- config,
- taskExecutorRpcService.getExecutor(),
- HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
- HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(config);
-
- metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
- // ---- (2) init task manager runner -------
- taskManagerRunner = new TaskManagerRunner(
- config,
- resourceID,
- taskExecutorRpcService,
- haServices,
- heartbeatServices,
- metricRegistry);
-
- // ---- (3) start the task manager runner
- taskManagerRunner.start();
- LOG.debug("YARN task executor started");
+ SecurityUtils.install(sc);
- taskManagerRunner.getTerminationFuture().get();
- // everything started, we can wait until all is done or the process is killed
- LOG.info("YARN task manager runner finished");
- shutdown();
+ SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
+ return null;
+ }
+ });
}
catch (Throwable t) {
// make sure that everything whatever ends up in the log
- LOG.error("YARN task executor initialization failed", t);
- shutdown();
- return INIT_ERROR_EXIT_CODE;
+ LOG.error("YARN TaskManager initialization failed.", t);
+ System.exit(INIT_ERROR_EXIT_CODE);
}
-
- return 0;
}
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- protected void shutdown() {
- if (taskExecutorRpcService != null) {
- try {
- taskExecutorRpcService.stopService();
- } catch (Throwable tt) {
- LOG.error("Error shutting down job master rpc service", tt);
- }
- }
- if (haServices != null) {
- try {
- haServices.close();
- } catch (Throwable tt) {
- LOG.warn("Failed to stop the HA service", tt);
- }
- }
- if (metricRegistry != null) {
- try {
- metricRegistry.shutdown();
- } catch (Throwable tt) {
- LOG.warn("Failed to stop the metrics registry", tt);
- }
- }
- }
-
}