You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:33 UTC

[14/52] [abbrv] flink git commit: [FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to MiniCluster

[FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to MiniCluster

If the managed memory size for the task manager has not been set in the Configuration, then
it is automatically calculated by dividing the available memory by the number of distributed
components. Additionally this PR allows to provide a MetricRegistry to the TaskManagerRunner.
That way it is possible to use the MiniCluster's MetricRegistry.

Add memory calculation for task managers

This closes #2669.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8b22e04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8b22e04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8b22e04

Branch: refs/heads/master
Commit: d8b22e0448216ae54d0f0cf0ece3f2afe71f2593
Parents: c001bec
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 20 11:07:08 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:24 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |   7 +-
 .../minicluster/MiniClusterConfiguration.java   |  86 ++++++++++++++-
 .../resourcemanager/JobLeaderIdService.java     |   2 +-
 .../resourcemanager/ResourceManager.java        |   3 +-
 .../resourcemanager/ResourceManagerRunner.java  | 102 ++++++++++++++++++
 .../exceptions/ConfigurationException.java      |   2 +-
 .../exceptions/ResourceManagerRunner.java       | 105 -------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  41 +++-----
 .../runtime/taskexecutor/TaskManagerRunner.java |  28 ++++-
 9 files changed, 229 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index b005330..611d4c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerRunner;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -161,7 +161,7 @@ public class MiniCluster {
 			LOG.info("Starting Flink Mini Cluster");
 			LOG.debug("Using configuration {}", config);
 
-			final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
+			final Configuration configuration = new UnmodifiableConfiguration(config.generateConfiguration());
 			final Time rpcTimeout = config.getRpcTimeout();
 			final int numJobManagers = config.getNumJobManagers();
 			final int numTaskManagers = config.getNumTaskManagers();
@@ -468,7 +468,8 @@ public class MiniCluster {
 				configuration,
 				new ResourceID(UUID.randomUUID().toString()),
 				taskManagerRpcServices[i],
-				haServices);
+				haServices,
+				metricRegistry);
 
 			taskManagerRunners[i].start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index cfbbffb..3a03ca3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -41,6 +44,8 @@ public class MiniClusterConfiguration {
 
 	private String commonBindAddress;
 
+	private long managedMemoryPerTaskManager = -1;
+
 	// ------------------------------------------------------------------------
 	//  Construction
 	// ------------------------------------------------------------------------
@@ -96,14 +101,15 @@ public class MiniClusterConfiguration {
 		this.commonBindAddress = bindAddress;
 	}
 
+	public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) {
+		checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager.");
+		this.managedMemoryPerTaskManager = managedMemoryPerTaskManager;
+	}
+
 	// ------------------------------------------------------------------------
 	//  getters
 	// ------------------------------------------------------------------------
 
-	public Configuration getConfiguration() {
-		return config;
-	}
-
 	public boolean getUseSingleRpcSystem() {
 		return singleRpcService;
 	}
@@ -147,10 +153,23 @@ public class MiniClusterConfiguration {
 		return Time.of(duration.length(), duration.unit());
 	}
 
+	public long getManagedMemoryPerTaskManager() {
+		return getOrCalculateManagedMemoryPerTaskManager();
+	}
+
 	// ------------------------------------------------------------------------
 	//  utils
 	// ------------------------------------------------------------------------
 
+	public Configuration generateConfiguration() {
+		Configuration newConfiguration = new Configuration(config);
+		// set the memory
+		long memory = getOrCalculateManagedMemoryPerTaskManager();
+		newConfiguration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memory);
+
+		return newConfiguration;
+	}
+
 	@Override
 	public String toString() {
 		return "MiniClusterConfiguration {" +
@@ -162,4 +181,63 @@ public class MiniClusterConfiguration {
 				", config=" + config +
 				'}';
 	}
+
+	/**
+	 * Get or calculate the managed memory per task manager. The memory is calculated in the
+	 * following order:
+	 *
+	 * 1. Return {@link #managedMemoryPerTaskManager} if set
+	 * 2. Return config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set
+	 * 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and
+	 * calculate the managed memory from the share of memory for a single task manager.
+	 *
+	 * @return
+	 */
+	private long getOrCalculateManagedMemoryPerTaskManager() {
+		if (managedMemoryPerTaskManager == -1) {
+			// no memory set in the mini cluster configuration
+			final ConfigOption<Integer> memorySizeOption = ConfigOptions
+				.key(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)
+				.defaultValue(-1);
+
+			int memorySize = config.getInteger(memorySizeOption);
+
+			if (memorySize == -1) {
+				// no memory set in the flink configuration
+				// share the available memory among all running components
+				final ConfigOption<Integer> bufferSizeOption = ConfigOptions
+					.key(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY)
+					.defaultValue(ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+
+				final ConfigOption<Long> bufferMemoryOption = ConfigOptions
+					.key(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
+					.defaultValue((long) ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+				final ConfigOption<Float> memoryFractionOption = ConfigOptions
+					.key(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY)
+					.defaultValue(ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+
+				float memoryFraction = config.getFloat(memoryFractionOption);
+				long networkBuffersMemory = config.getLong(bufferMemoryOption) * config.getInteger(bufferSizeOption);
+
+				long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
+
+				// we assign each component the same amount of free memory
+				// (might be a bit of overkill for the JMs and RMs)
+				long memoryPerComponent = freeMemory / (numTaskManagers + numResourceManagers + numJobManagers);
+
+				// subtract the network buffer memory
+				long memoryMinusNetworkBuffers = memoryPerComponent - networkBuffersMemory;
+
+				// calculate the managed memory size
+				long managedMemoryBytes = (long) (memoryMinusNetworkBuffers * memoryFraction);
+
+				return managedMemoryBytes >>> 20;
+			} else {
+				return memorySize;
+			}
+		} else {
+			return managedMemoryPerTaskManager;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 6c7e249..56e72c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -59,7 +59,7 @@ public class JobLeaderIdService {
 	/** Actions to call when the job leader changes */
 	private JobLeaderIdActions jobLeaderIdActions;
 
-	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception {
+	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) {
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
 
 		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 7240087..a81c214 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -156,8 +156,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			throw new ResourceManagerException("Could not create the slot manager.", e);
 		}
 
+		leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
+
 		try {
-			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 			leaderElectionService.start(this);
 		} catch (Exception e) {
 			throw new ResourceManagerException("Could not start the leader election service.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
new file mode 100644
index 0000000..959b727
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services
+ * and handles fatal errors by shutting the resource manager down.
+ */
+public class ResourceManagerRunner implements FatalErrorHandler {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerRunner.class);
+
+	private final Object lock = new Object();
+
+	private final ResourceManager<?> resourceManager;
+
+	public ResourceManagerRunner(
+			final Configuration configuration,
+			final RpcService rpcService,
+			final HighAvailabilityServices highAvailabilityServices,
+			final MetricRegistry metricRegistry) throws ConfigurationException {
+
+		Preconditions.checkNotNull(configuration);
+		Preconditions.checkNotNull(rpcService);
+		Preconditions.checkNotNull(highAvailabilityServices);
+		Preconditions.checkNotNull(metricRegistry);
+
+		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
+
+		this.resourceManager = new StandaloneResourceManager(
+			rpcService,
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			slotManagerFactory,
+			metricRegistry,
+			jobLeaderIdService,
+			this);
+	}
+
+	//-------------------------------------------------------------------------------------
+	// Lifecycle management
+	//-------------------------------------------------------------------------------------
+
+	public void start() throws Exception {
+		resourceManager.start();
+	}
+
+	public void shutDown() throws Exception {
+		shutDownInternally();
+	}
+
+	private void shutDownInternally() throws Exception {
+		synchronized (lock) {
+			resourceManager.shutDown();
+		}
+	}
+
+	//-------------------------------------------------------------------------------------
+	// Fatal error handler
+	//-------------------------------------------------------------------------------------
+
+	@Override
+	public void onFatalError(Throwable exception) {
+		LOG.error("Encountered fatal error.", exception);
+
+		try {
+			shutDownInternally();
+		} catch (Exception e) {
+			LOG.error("Could not properly shut down the resource manager.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
index f081fff..0007318 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.exceptions;
 
 /**
- * Base class for configuration related exception which occur when creating a configuration.
+ * Exception which occures when creating a configuration object fails.
  */
 public class ConfigurationException extends Exception {
 	private static final long serialVersionUID = 3971647332059381556L;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
deleted file mode 100644
index 0c7e4e4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.exceptions;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services
- * and handles fatal errors by shutting the resource manager down.
- */
-public class ResourceManagerRunner implements FatalErrorHandler {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerRunner.class);
-
-	private final Object lock = new Object();
-
-	private final ResourceManager<?> resourceManager;
-
-	public ResourceManagerRunner(
-			final Configuration configuration,
-			final RpcService rpcService,
-			final HighAvailabilityServices highAvailabilityServices,
-			final MetricRegistry metricRegistry) throws Exception {
-
-		Preconditions.checkNotNull(configuration);
-		Preconditions.checkNotNull(rpcService);
-		Preconditions.checkNotNull(highAvailabilityServices);
-		Preconditions.checkNotNull(metricRegistry);
-
-		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
-		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
-		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
-
-		this.resourceManager = new StandaloneResourceManager(
-			rpcService,
-			resourceManagerConfiguration,
-			highAvailabilityServices,
-			slotManagerFactory,
-			metricRegistry,
-			jobLeaderIdService,
-			this);
-	}
-
-	//-------------------------------------------------------------------------------------
-	// Lifecycle management
-	//-------------------------------------------------------------------------------------
-
-	public void start() throws Exception {
-		resourceManager.start();
-	}
-
-	public void shutDown() throws Exception {
-		shutDownInternally();
-	}
-
-	private void shutDownInternally() throws Exception {
-		synchronized (lock) {
-			resourceManager.shutDown();
-		}
-	}
-
-	//-------------------------------------------------------------------------------------
-	// Fatal error handler
-	//-------------------------------------------------------------------------------------
-
-	@Override
-	public void onFatalError(Throwable exception) {
-		LOG.error("Encountered fatal error.", exception);
-
-		try {
-			shutDownInternally();
-		} catch (Exception e) {
-			LOG.error("Could not properly shut down the resource manager.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8187fde..b981829 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -80,6 +80,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -217,47 +218,33 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	 * Called to shut down the TaskManager. The method closes all TaskManager services.
 	 */
 	@Override
-	public void shutDown() {
+	public void shutDown() throws Exception {
 		log.info("Stopping TaskManager {}.", getAddress());
 
+		Exception exception = null;
+
 		taskSlotTable.stop();
 
 		if (isConnectedToResourceManager()) {
-			try {
-				resourceManagerConnection.close();
-			} catch (Exception e) {
-				log.error("Could not cleanly close the ResourceManager connection.", e);
-			}
+			resourceManagerConnection.close();
 		}
 
-		try {
-			ioManager.shutdown();
-		} catch (Exception e) {
-			log.error("IOManager did not shut down properly.", e);
-		}
+		ioManager.shutdown();
 
-		try {
-			memoryManager.shutdown();
-		} catch (Exception e) {
-			log.error("MemoryManager did not shut down properly.", e);
-		}
+		memoryManager.shutdown();
 
-		try {
-			networkEnvironment.shutdown();
-		} catch (Exception e) {
-			log.error("Network environment did not shut down properly.", e);
-		}
+		networkEnvironment.shutdown();
+
+		fileCache.shutdown();
 
 		try {
-			fileCache.shutdown();
+			super.shutDown();
 		} catch (Exception e) {
-			log.error("File cache did not shut down properly.", e);
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
-		try {
-			metricRegistry.shutdown();
-		} catch (Exception e) {
-			log.error("MetricRegistry did not shut down properly.", e);
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Error while shutting the TaskExecutor down.");
 		}
 
 		log.info("Stopped TaskManager {}.", getAddress());

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 7d9ee55..99a7c5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -24,9 +24,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
 import org.apache.flink.util.Preconditions;
@@ -66,7 +69,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		Configuration configuration,
 		ResourceID resourceID,
 		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices) throws Exception {
+		HighAvailabilityServices highAvailabilityServices,
+		MetricRegistry metricRegistry) throws Exception {
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -81,10 +85,20 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			remoteAddress,
 			false);
 
-		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, resourceID);
+		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
+			taskManagerServicesConfiguration,
+			resourceID);
 
 		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 
+		TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
+			metricRegistry,
+			taskManagerServices.getTaskManagerLocation().getHostname(),
+			resourceID.toString());
+
+		// Initialize the TM metrics
+		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
+
 		this.taskManager = new TaskExecutor(
 			taskManagerConfiguration,
 			taskManagerServices.getTaskManagerLocation(),
@@ -93,8 +107,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			taskManagerServices.getIOManager(),
 			taskManagerServices.getNetworkEnvironment(),
 			highAvailabilityServices,
-			taskManagerServices.getMetricRegistry(),
-			taskManagerServices.getTaskManagerMetricGroup(),
+			metricRegistry,
+			taskManagerMetricGroup,
 			taskManagerServices.getBroadcastVariableManager(),
 			taskManagerServices.getFileCache(),
 			taskManagerServices.getTaskSlotTable(),
@@ -117,7 +131,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 	protected void shutDownInternally() {
 		synchronized(lock) {
-			taskManager.shutDown();
+			try {
+				taskManager.shutDown();
+			} catch (Exception e) {
+				LOG.error("Could not properly shut down the task manager.", e);
+			}
 		}
 	}