You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/05 11:30:36 UTC

flink git commit: [FLINK-7074] [tm] Add entry point for the TaskManagerRunner

Repository: flink
Updated Branches:
  refs/heads/master 1ba1260a8 -> 40dce2909


[FLINK-7074] [tm] Add entry point for the TaskManagerRunner

The entry point can be used by the standalone mode to run a TaskManager. Moreover, the
YarnTaskExecutorRunner now reuses some of the start up logic of the TaskManagerRunner.

This closes #4252.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40dce290
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40dce290
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40dce290

Branch: refs/heads/master
Commit: 40dce2909e9a5c3d915bd8c27c3e59bc102e9a15
Parents: 1ba1260
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 4 15:01:18 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 5 13:28:58 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |  53 ++++-
 .../runtime/taskexecutor/TaskManagerRunner.java | 237 ++++++++++++++-----
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   6 +
 .../flink/yarn/YarnTaskExecutorRunner.java      | 135 ++---------
 4 files changed, 235 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40dce290/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3e7f2f3..6d3a3c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -39,8 +39,10 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -90,7 +92,7 @@ public class MiniCluster {
 	private ResourceManagerRunner[] resourceManagerRunners;
 
 	@GuardedBy("lock")
-	private TaskManagerRunner[] taskManagerRunners;
+	private TaskExecutor[] taskManagers;
 
 	@GuardedBy("lock")
 	private MiniClusterJobDispatcher jobDispatcher;
@@ -253,7 +255,7 @@ public class MiniCluster {
 
 				// bring up the TaskManager(s) for the mini cluster
 				LOG.info("Starting {} TaskManger(s)", numTaskManagers);
-				taskManagerRunners = startTaskManagers(
+				taskManagers = startTaskManagers(
 						configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
 
 				// bring up the dispatcher that launches JobManagers when jobs submitted
@@ -338,17 +340,17 @@ public class MiniCluster {
 			resourceManagerRunners = null;
 		}
 
-		if (taskManagerRunners != null) {
-			for (TaskManagerRunner tm : taskManagerRunners) {
+		if (taskManagers != null) {
+			for (TaskExecutor tm : taskManagers) {
 				if (tm != null) {
 					try {
-						tm.shutDown(null);
+						tm.shutDown();
 					} catch (Throwable t) {
 						exception = firstOrSuppressed(t, exception);
 					}
 				}
 			}
-			taskManagerRunners = null;
+			taskManagers = null;
 		}
 
 		// shut down the RpcServices
@@ -402,7 +404,7 @@ public class MiniCluster {
 			final ResourceManagerGateway resourceManager = 
 					commonRpcService.connect(addressAndId.leaderAddress(), ResourceManagerGateway.class).get();
 
-			final int numTaskManagersToWaitFor = taskManagerRunners.length;
+			final int numTaskManagersToWaitFor = taskManagers.length;
 
 			// poll and wait until enough TaskManagers are available
 			while (true) {
@@ -540,30 +542,31 @@ public class MiniCluster {
 		return resourceManagerRunners;
 	}
 
-	protected TaskManagerRunner[] startTaskManagers(
+	protected TaskExecutor[] startTaskManagers(
 			Configuration configuration,
 			HighAvailabilityServices haServices,
 			MetricRegistry metricRegistry,
 			int numTaskManagers,
 			RpcService[] taskManagerRpcServices) throws Exception {
 
-		final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers];
+		final TaskExecutor[] taskExecutors = new TaskExecutor[numTaskManagers];
 		final boolean localCommunication = numTaskManagers == 1;
 
 		for (int i = 0; i < numTaskManagers; i++) {
-			taskManagerRunners[i] = new TaskManagerRunner(
+			taskExecutors[i] = TaskManagerRunner.startTaskManager(
 				configuration,
 				new ResourceID(UUID.randomUUID().toString()),
 				taskManagerRpcServices[i],
 				haServices,
 				heartbeatServices,
 				metricRegistry,
-				localCommunication);
+				localCommunication,
+				new TerminatingFatalErrorHandler(i));
 
-			taskManagerRunners[i].start();
+			taskExecutors[i].start();
 		}
 
-		return taskManagerRunners;
+		return taskExecutors;
 	}
 
 	// ------------------------------------------------------------------------
@@ -614,4 +617,28 @@ public class MiniCluster {
 
 		return config;
 	}
+
+	private class TerminatingFatalErrorHandler implements FatalErrorHandler {
+
+		private final int index;
+
+		private TerminatingFatalErrorHandler(int index) {
+			this.index = index;
+		}
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			LOG.error("TaskManager #{} failed.", index, exception);
+
+			try {
+				synchronized (lock) {
+					if (taskManagers[index] != null) {
+						taskManagers[index].shutDown();
+					}
+				}
+			} catch (Exception e) {
+				LOG.error("TaskManager #{} could not be properly terminated.", index, e);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40dce290/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 2ed1578..78b49ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -19,27 +19,41 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
-import java.util.concurrent.Executor;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -52,92 +66,58 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
 
+	private static final int STARTUP_FAILURE_RETURN_CODE = 1;
+
+	private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
 	private final Object lock = new Object();
 
 	private final Configuration configuration;
 
 	private final ResourceID resourceID;
 
+	private final Time timeout;
+
 	private final RpcService rpcService;
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
+	private final MetricRegistry metricRegistry;
+
 	/** Executor used to run future callbacks */
-	private final Executor executor;
+	private final ExecutorService executor;
 
 	private final TaskExecutor taskManager;
 
-	public TaskManagerRunner(
-			Configuration configuration,
-			ResourceID resourceID,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry) throws Exception {
-
-		this(
-			configuration,
-			resourceID,
-			rpcService,
-			highAvailabilityServices,
-			heartbeatServices,
-			metricRegistry,
-			false);
-	}
-
-	public TaskManagerRunner(
-			Configuration configuration,
-			ResourceID resourceID,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			boolean localCommunicationOnly) throws Exception {
-
+	public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
 		this.configuration = Preconditions.checkNotNull(configuration);
-		this.resourceID = Preconditions.checkNotNull(resourceID);
-		this.rpcService = Preconditions.checkNotNull(rpcService);
-		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
-		this.executor = rpcService.getExecutor();
+		this.resourceID = Preconditions.checkNotNull(resourceId);
 
-		InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
+		timeout = AkkaUtils.getTimeoutAsTime(configuration);
 
-		TaskManagerServicesConfiguration taskManagerServicesConfiguration = 
-				TaskManagerServicesConfiguration.fromConfiguration(
-						configuration,
-						remoteAddress,
-						localCommunicationOnly);
+		this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
+			Hardware.getNumberCPUCores(),
+			new ExecutorThreadFactory("taskmanager-future"));
 
-		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
-			taskManagerServicesConfiguration,
-			resourceID);
+		highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			configuration,
+			executor,
+			HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
-		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+		rpcService = createRpcService(configuration, highAvailabilityServices);
 
-		TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
-			metricRegistry,
-			taskManagerServices.getTaskManagerLocation().getHostname(),
-			resourceID.toString());
+		HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 
-		// Initialize the TM metrics
-		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
+		metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
 
-		this.taskManager = new TaskExecutor(
+		taskManager = startTaskManager(
+			configuration,
+			resourceId,
 			rpcService,
-			taskManagerConfiguration,
-			taskManagerServices.getTaskManagerLocation(),
-			taskManagerServices.getMemoryManager(),
-			taskManagerServices.getIOManager(),
-			taskManagerServices.getNetworkEnvironment(),
 			highAvailabilityServices,
 			heartbeatServices,
 			metricRegistry,
-			taskManagerMetricGroup,
-			taskManagerServices.getBroadcastVariableManager(),
-			taskManagerServices.getFileCache(),
-			taskManagerServices.getTaskSlotTable(),
-			taskManagerServices.getJobManagerTable(),
-			taskManagerServices.getJobLeaderService(),
+			false,
 			this);
 	}
 
@@ -149,16 +129,34 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		taskManager.start();
 	}
 
-	public void shutDown(Throwable cause) {
+	public void shutDown() throws Exception {
 		shutDownInternally();
 	}
 
-	protected void shutDownInternally() {
+	protected void shutDownInternally() throws Exception {
+		Exception exception = null;
+
 		synchronized(lock) {
 			try {
 				taskManager.shutDown();
 			} catch (Exception e) {
-				LOG.error("Could not properly shut down the task manager.", e);
+				exception = e;
+			}
+
+			metricRegistry.shutdown();
+
+			rpcService.stopService();
+
+			try {
+				highAvailabilityServices.close();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			Executors.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
+
+			if (exception != null) {
+				throw exception;
 			}
 		}
 	}
@@ -175,13 +173,122 @@ public class TaskManagerRunner implements FatalErrorHandler {
 	@Override
 	public void onFatalError(Throwable exception) {
 		LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception);
-		shutDown(exception);
+
+		try {
+			shutDown();
+		} catch (Throwable t) {
+			LOG.error("Could not properly shut down TaskManager.", t);
+		}
+
+		System.exit(RUNTIME_FAILURE_RETURN_CODE);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Static entry point
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+		// startup checks and logging
+		EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
+		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+		long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
+
+		if (maxOpenFileHandles != -1L) {
+			LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
+		} else {
+			LOG.info("Cannot determine the maximum number of open file descriptors");
+		}
+
+		ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		final String configDir = parameterTool.get("configDir");
+
+		final Configuration configuration = GlobalConfiguration.loadConfiguration(configDir);
+
+		SecurityUtils.install(new SecurityUtils.SecurityConfiguration(configuration));
+
+		try {
+			SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
+				@Override
+				public Void call() throws Exception {
+					runTaskManager(configuration, ResourceID.generate());
+					return null;
+				}
+			});
+		} catch (Throwable t) {
+			LOG.error("TaskManager initialization failed.", t);
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
+		}
+	}
+
+	public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
+		final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);
+
+		taskManagerRunner.start();
 	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Static utilities
 	// --------------------------------------------------------------------------------------------
 
+	public static TaskExecutor startTaskManager(
+		Configuration configuration,
+		ResourceID resourceID,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		boolean localCommunicationOnly,
+		FatalErrorHandler fatalErrorHandler) throws Exception {
+
+		Preconditions.checkNotNull(configuration);
+		Preconditions.checkNotNull(resourceID);
+		Preconditions.checkNotNull(rpcService);
+		Preconditions.checkNotNull(highAvailabilityServices);
+
+		InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
+
+		TaskManagerServicesConfiguration taskManagerServicesConfiguration =
+			TaskManagerServicesConfiguration.fromConfiguration(
+				configuration,
+				remoteAddress,
+				localCommunicationOnly);
+
+		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
+			taskManagerServicesConfiguration,
+			resourceID);
+
+		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+
+		TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
+			metricRegistry,
+			taskManagerServices.getTaskManagerLocation().getHostname(),
+			resourceID.toString());
+
+		// Initialize the TM metrics
+		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
+
+		return new TaskExecutor(
+			rpcService,
+			taskManagerConfiguration,
+			taskManagerServices.getTaskManagerLocation(),
+			taskManagerServices.getMemoryManager(),
+			taskManagerServices.getIOManager(),
+			taskManagerServices.getNetworkEnvironment(),
+			highAvailabilityServices,
+			heartbeatServices,
+			metricRegistry,
+			taskManagerMetricGroup,
+			taskManagerServices.getBroadcastVariableManager(),
+			taskManagerServices.getFileCache(),
+			taskManagerServices.getTaskSlotTable(),
+			taskManagerServices.getJobManagerTable(),
+			taskManagerServices.getJobLeaderService(),
+			fatalErrorHandler);
+	}
+
 	/**
 	 * Create a RPC service for the task manager.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/40dce290/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index b74a9a3..408cc93 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -567,6 +567,12 @@ object AkkaUtils {
     new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
   }
 
+  def getTimeoutAsTime(config: Configuration): Time = {
+    val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT))
+
+    Time.milliseconds(duration.toMillis)
+  }
+
   def getDefaultTimeout: Time = {
     val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue())
 

http://git-wip-us.apache.org/repos/asf/flink/blob/40dce290/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 2ed4c1d..e0dc55d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -25,12 +25,6 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -61,14 +55,6 @@ public class YarnTaskExecutorRunner {
 	/** The exit code returned if the initialization of the yarn task executor runner failed. */
 	private static final int INIT_ERROR_EXIT_CODE = 31;
 
-	private MetricRegistry metricRegistry;
-
-	private HighAvailabilityServices haServices;
-
-	private RpcService taskExecutorRpcService;
-
-	private TaskManagerRunner taskManagerRunner;
-
 	// ------------------------------------------------------------------------
 	//  Program entry point
 	// ------------------------------------------------------------------------
@@ -83,20 +69,17 @@ public class YarnTaskExecutorRunner {
 		SignalHandler.register(LOG);
 		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-		// run and exit with the proper return code
-		int returnCode = new YarnTaskExecutorRunner().run(args);
-		System.exit(returnCode);
+		run(args);
 	}
 
 	/**
-	 * The instance entry point for the YARN task executor. Obtains user group
-	 * information and calls the main work method {@link #runTaskExecutor(org.apache.flink.configuration.Configuration)} as a
+	 * The instance entry point for the YARN task executor. Obtains user group information and calls
+	 * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
 	 * privileged action.
 	 *
 	 * @param args The command line arguments.
-	 * @return The process exit code.
 	 */
-	protected int run(String[] args) {
+	private static void run(String[] args) {
 		try {
 			LOG.debug("All environment variables: {}", ENV);
 
@@ -166,114 +149,30 @@ public class YarnTaskExecutorRunner {
 				configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 			}
 
-			SecurityUtils.install(sc);
-
-			return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
-				@Override
-				public Integer call() throws Exception {
-					return runTaskExecutor(configuration);
-				}
-			});
-
-		}
-		catch (Throwable t) {
-			// make sure that everything whatever ends up in the log
-			LOG.error("YARN Application Master initialization failed", t);
-			return INIT_ERROR_EXIT_CODE;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Core work method
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The main work method, must run as a privileged action.
-	 *
-	 * @return The return code for the Java process.
-	 */
-	protected int runTaskExecutor(Configuration config) {
-
-		try {
-			// ---- (1) create common services
-			// first get the ResouceId, resource id is the container id for yarn.
 			final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
 			Preconditions.checkArgument(containerId != null,
-					"ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
+				"ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
+
 			// use the hostname passed by job manager
 			final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);
 			if (taskExecutorHostname != null) {
-				config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname);
+				configuration.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname);
 			}
 
-			ResourceID resourceID = new ResourceID(containerId);
-			LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString());
-
-			taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices);
-
-			haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				config,
-				taskExecutorRpcService.getExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-			HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(config);
-
-			metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-			// ---- (2) init task manager runner -------
-			taskManagerRunner = new TaskManagerRunner(
-				config,
-				resourceID,
-				taskExecutorRpcService,
-				haServices,
-				heartbeatServices,
-				metricRegistry);
-
-			// ---- (3) start the task manager runner
-			taskManagerRunner.start();
-			LOG.debug("YARN task executor started");
+			SecurityUtils.install(sc);
 
-			taskManagerRunner.getTerminationFuture().get();
-			// everything started, we can wait until all is done or the process is killed
-			LOG.info("YARN task manager runner finished");
-			shutdown();
+			SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
+				@Override
+				public Void call() throws Exception {
+					TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
+					return null;
+				}
+			});
 		}
 		catch (Throwable t) {
 			// make sure that everything whatever ends up in the log
-			LOG.error("YARN task executor initialization failed", t);
-			shutdown();
-			return INIT_ERROR_EXIT_CODE;
+			LOG.error("YARN TaskManager initialization failed.", t);
+			System.exit(INIT_ERROR_EXIT_CODE);
 		}
-
-		return 0;
 	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	protected void shutdown() {
-			if (taskExecutorRpcService != null) {
-				try {
-					taskExecutorRpcService.stopService();
-				} catch (Throwable tt) {
-					LOG.error("Error shutting down job master rpc service", tt);
-				}
-			}
-			if (haServices != null) {
-				try {
-					haServices.close();
-				} catch (Throwable tt) {
-					LOG.warn("Failed to stop the HA service", tt);
-				}
-			}
-			if (metricRegistry != null) {
-				try {
-					metricRegistry.shutdown();
-				} catch (Throwable tt) {
-					LOG.warn("Failed to stop the metrics registry", tt);
-				}
-			}
-	}
-
 }