You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/23 09:18:19 UTC

[5/5] flink git commit: [FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to MiniCluster

[FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to MiniCluster

If the managed memory size for the task manager has not been set in the Configuration, then
it is automatically calculated by dividing the available memory by the number of distributed
components. Additionally this PR allows to provide a MetricRegistry to the TaskManagerRunner.
That way it is possible to use the MiniCluster's MetricRegistry.

Add memory calculation for task managers

This closes #2669.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80b6c2a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80b6c2a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80b6c2a0

Branch: refs/heads/flip-6
Commit: 80b6c2a015fa12bd73dfe8ccdb3930de0396e623
Parents: 0e965ae
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 20 11:07:08 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 23 11:15:04 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |   7 +-
 .../minicluster/MiniClusterConfiguration.java   |  86 ++++++++++++++-
 .../resourcemanager/JobLeaderIdService.java     |   2 +-
 .../resourcemanager/ResourceManager.java        |   3 +-
 .../resourcemanager/ResourceManagerRunner.java  | 102 ++++++++++++++++++
 .../exceptions/ConfigurationException.java      |   2 +-
 .../exceptions/ResourceManagerRunner.java       | 105 -------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  41 +++-----
 .../runtime/taskexecutor/TaskManagerRunner.java |  28 ++++-
 .../taskexecutor/TaskManagerServices.java       |  28 -----
 .../TaskManagerServicesConfiguration.java       |  19 +---
 .../minicluster/LocalFlinkMiniCluster.scala     |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   5 +-
 13 files changed, 236 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index b005330..611d4c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerRunner;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -161,7 +161,7 @@ public class MiniCluster {
 			LOG.info("Starting Flink Mini Cluster");
 			LOG.debug("Using configuration {}", config);
 
-			final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
+			final Configuration configuration = new UnmodifiableConfiguration(config.generateConfiguration());
 			final Time rpcTimeout = config.getRpcTimeout();
 			final int numJobManagers = config.getNumJobManagers();
 			final int numTaskManagers = config.getNumTaskManagers();
@@ -468,7 +468,8 @@ public class MiniCluster {
 				configuration,
 				new ResourceID(UUID.randomUUID().toString()),
 				taskManagerRpcServices[i],
-				haServices);
+				haServices,
+				metricRegistry);
 
 			taskManagerRunners[i].start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index cfbbffb..3a03ca3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -41,6 +44,8 @@ public class MiniClusterConfiguration {
 
 	private String commonBindAddress;
 
+	private long managedMemoryPerTaskManager = -1;
+
 	// ------------------------------------------------------------------------
 	//  Construction
 	// ------------------------------------------------------------------------
@@ -96,14 +101,15 @@ public class MiniClusterConfiguration {
 		this.commonBindAddress = bindAddress;
 	}
 
+	public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) {
+		checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager.");
+		this.managedMemoryPerTaskManager = managedMemoryPerTaskManager;
+	}
+
 	// ------------------------------------------------------------------------
 	//  getters
 	// ------------------------------------------------------------------------
 
-	public Configuration getConfiguration() {
-		return config;
-	}
-
 	public boolean getUseSingleRpcSystem() {
 		return singleRpcService;
 	}
@@ -147,10 +153,23 @@ public class MiniClusterConfiguration {
 		return Time.of(duration.length(), duration.unit());
 	}
 
+	public long getManagedMemoryPerTaskManager() {
+		return getOrCalculateManagedMemoryPerTaskManager();
+	}
+
 	// ------------------------------------------------------------------------
 	//  utils
 	// ------------------------------------------------------------------------
 
+	public Configuration generateConfiguration() {
+		Configuration newConfiguration = new Configuration(config);
+		// set the memory
+		long memory = getOrCalculateManagedMemoryPerTaskManager();
+		newConfiguration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memory);
+
+		return newConfiguration;
+	}
+
 	@Override
 	public String toString() {
 		return "MiniClusterConfiguration {" +
@@ -162,4 +181,63 @@ public class MiniClusterConfiguration {
 				", config=" + config +
 				'}';
 	}
+
+	/**
+	 * Get or calculate the managed memory per task manager. The memory is calculated in the
+	 * following order:
+	 *
+	 * 1. Return {@link #managedMemoryPerTaskManager} if set
+	 * 2. Return config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set
+	 * 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and
+	 * calculate the managed memory from the share of memory for a single task manager.
+	 *
+	 * @return
+	 */
+	private long getOrCalculateManagedMemoryPerTaskManager() {
+		if (managedMemoryPerTaskManager == -1) {
+			// no memory set in the mini cluster configuration
+			final ConfigOption<Integer> memorySizeOption = ConfigOptions
+				.key(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)
+				.defaultValue(-1);
+
+			int memorySize = config.getInteger(memorySizeOption);
+
+			if (memorySize == -1) {
+				// no memory set in the flink configuration
+				// share the available memory among all running components
+				final ConfigOption<Integer> bufferSizeOption = ConfigOptions
+					.key(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY)
+					.defaultValue(ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+
+				final ConfigOption<Long> bufferMemoryOption = ConfigOptions
+					.key(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
+					.defaultValue((long) ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+				final ConfigOption<Float> memoryFractionOption = ConfigOptions
+					.key(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY)
+					.defaultValue(ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+
+				float memoryFraction = config.getFloat(memoryFractionOption);
+				long networkBuffersMemory = config.getLong(bufferMemoryOption) * config.getInteger(bufferSizeOption);
+
+				long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
+
+				// we assign each component the same amount of free memory
+				// (might be a bit of overkill for the JMs and RMs)
+				long memoryPerComponent = freeMemory / (numTaskManagers + numResourceManagers + numJobManagers);
+
+				// subtract the network buffer memory
+				long memoryMinusNetworkBuffers = memoryPerComponent - networkBuffersMemory;
+
+				// calculate the managed memory size
+				long managedMemoryBytes = (long) (memoryMinusNetworkBuffers * memoryFraction);
+
+				return managedMemoryBytes >>> 20;
+			} else {
+				return memorySize;
+			}
+		} else {
+			return managedMemoryPerTaskManager;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 6c7e249..56e72c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -59,7 +59,7 @@ public class JobLeaderIdService {
 	/** Actions to call when the job leader changes */
 	private JobLeaderIdActions jobLeaderIdActions;
 
-	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception {
+	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) {
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
 
 		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 7240087..a81c214 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -156,8 +156,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			throw new ResourceManagerException("Could not create the slot manager.", e);
 		}
 
+		leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
+
 		try {
-			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 			leaderElectionService.start(this);
 		} catch (Exception e) {
 			throw new ResourceManagerException("Could not start the leader election service.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
new file mode 100644
index 0000000..959b727
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services
+ * and handles fatal errors by shutting the resource manager down.
+ */
+public class ResourceManagerRunner implements FatalErrorHandler {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerRunner.class);
+
+	private final Object lock = new Object();
+
+	private final ResourceManager<?> resourceManager;
+
+	public ResourceManagerRunner(
+			final Configuration configuration,
+			final RpcService rpcService,
+			final HighAvailabilityServices highAvailabilityServices,
+			final MetricRegistry metricRegistry) throws ConfigurationException {
+
+		Preconditions.checkNotNull(configuration);
+		Preconditions.checkNotNull(rpcService);
+		Preconditions.checkNotNull(highAvailabilityServices);
+		Preconditions.checkNotNull(metricRegistry);
+
+		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
+
+		this.resourceManager = new StandaloneResourceManager(
+			rpcService,
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			slotManagerFactory,
+			metricRegistry,
+			jobLeaderIdService,
+			this);
+	}
+
+	//-------------------------------------------------------------------------------------
+	// Lifecycle management
+	//-------------------------------------------------------------------------------------
+
+	public void start() throws Exception {
+		resourceManager.start();
+	}
+
+	public void shutDown() throws Exception {
+		shutDownInternally();
+	}
+
+	private void shutDownInternally() throws Exception {
+		synchronized (lock) {
+			resourceManager.shutDown();
+		}
+	}
+
+	//-------------------------------------------------------------------------------------
+	// Fatal error handler
+	//-------------------------------------------------------------------------------------
+
+	@Override
+	public void onFatalError(Throwable exception) {
+		LOG.error("Encountered fatal error.", exception);
+
+		try {
+			shutDownInternally();
+		} catch (Exception e) {
+			LOG.error("Could not properly shut down the resource manager.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
index f081fff..0007318 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.exceptions;
 
 /**
- * Base class for configuration related exception which occur when creating a configuration.
+ * Exception which occures when creating a configuration object fails.
  */
 public class ConfigurationException extends Exception {
 	private static final long serialVersionUID = 3971647332059381556L;

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
deleted file mode 100644
index 0c7e4e4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.exceptions;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services
- * and handles fatal errors by shutting the resource manager down.
- */
-public class ResourceManagerRunner implements FatalErrorHandler {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerRunner.class);
-
-	private final Object lock = new Object();
-
-	private final ResourceManager<?> resourceManager;
-
-	public ResourceManagerRunner(
-			final Configuration configuration,
-			final RpcService rpcService,
-			final HighAvailabilityServices highAvailabilityServices,
-			final MetricRegistry metricRegistry) throws Exception {
-
-		Preconditions.checkNotNull(configuration);
-		Preconditions.checkNotNull(rpcService);
-		Preconditions.checkNotNull(highAvailabilityServices);
-		Preconditions.checkNotNull(metricRegistry);
-
-		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
-		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
-		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
-
-		this.resourceManager = new StandaloneResourceManager(
-			rpcService,
-			resourceManagerConfiguration,
-			highAvailabilityServices,
-			slotManagerFactory,
-			metricRegistry,
-			jobLeaderIdService,
-			this);
-	}
-
-	//-------------------------------------------------------------------------------------
-	// Lifecycle management
-	//-------------------------------------------------------------------------------------
-
-	public void start() throws Exception {
-		resourceManager.start();
-	}
-
-	public void shutDown() throws Exception {
-		shutDownInternally();
-	}
-
-	private void shutDownInternally() throws Exception {
-		synchronized (lock) {
-			resourceManager.shutDown();
-		}
-	}
-
-	//-------------------------------------------------------------------------------------
-	// Fatal error handler
-	//-------------------------------------------------------------------------------------
-
-	@Override
-	public void onFatalError(Throwable exception) {
-		LOG.error("Encountered fatal error.", exception);
-
-		try {
-			shutDownInternally();
-		} catch (Exception e) {
-			LOG.error("Could not properly shut down the resource manager.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index aea926c..5eb8e6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -78,6 +78,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -215,47 +216,33 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	 * Called to shut down the TaskManager. The method closes all TaskManager services.
 	 */
 	@Override
-	public void shutDown() {
+	public void shutDown() throws Exception {
 		log.info("Stopping TaskManager {}.", getAddress());
 
+		Exception exception = null;
+
 		taskSlotTable.stop();
 
 		if (isConnectedToResourceManager()) {
-			try {
-				resourceManagerConnection.close();
-			} catch (Exception e) {
-				log.error("Could not cleanly close the ResourceManager connection.", e);
-			}
+			resourceManagerConnection.close();
 		}
 
-		try {
-			ioManager.shutdown();
-		} catch (Exception e) {
-			log.error("IOManager did not shut down properly.", e);
-		}
+		ioManager.shutdown();
 
-		try {
-			memoryManager.shutdown();
-		} catch (Exception e) {
-			log.error("MemoryManager did not shut down properly.", e);
-		}
+		memoryManager.shutdown();
 
-		try {
-			networkEnvironment.shutdown();
-		} catch (Exception e) {
-			log.error("Network environment did not shut down properly.", e);
-		}
+		networkEnvironment.shutdown();
+
+		fileCache.shutdown();
 
 		try {
-			fileCache.shutdown();
+			super.shutDown();
 		} catch (Exception e) {
-			log.error("File cache did not shut down properly.", e);
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
-		try {
-			metricRegistry.shutdown();
-		} catch (Exception e) {
-			log.error("MetricRegistry did not shut down properly.", e);
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Error while shutting the TaskExecutor down.");
 		}
 
 		log.info("Stopped TaskManager {}.", getAddress());

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 7d9ee55..99a7c5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -24,9 +24,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
 import org.apache.flink.util.Preconditions;
@@ -66,7 +69,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		Configuration configuration,
 		ResourceID resourceID,
 		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices) throws Exception {
+		HighAvailabilityServices highAvailabilityServices,
+		MetricRegistry metricRegistry) throws Exception {
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -81,10 +85,20 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			remoteAddress,
 			false);
 
-		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, resourceID);
+		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
+			taskManagerServicesConfiguration,
+			resourceID);
 
 		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 
+		TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
+			metricRegistry,
+			taskManagerServices.getTaskManagerLocation().getHostname(),
+			resourceID.toString());
+
+		// Initialize the TM metrics
+		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
+
 		this.taskManager = new TaskExecutor(
 			taskManagerConfiguration,
 			taskManagerServices.getTaskManagerLocation(),
@@ -93,8 +107,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			taskManagerServices.getIOManager(),
 			taskManagerServices.getNetworkEnvironment(),
 			highAvailabilityServices,
-			taskManagerServices.getMetricRegistry(),
-			taskManagerServices.getTaskManagerMetricGroup(),
+			metricRegistry,
+			taskManagerMetricGroup,
 			taskManagerServices.getBroadcastVariableManager(),
 			taskManagerServices.getFileCache(),
 			taskManagerServices.getTaskSlotTable(),
@@ -117,7 +131,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 	protected void shutDownInternally() {
 		synchronized(lock) {
-			taskManager.shutDown();
+			try {
+				taskManager.shutDown();
+			} catch (Exception e) {
+				LOG.error("Could not properly shut down the task manager.", e);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e8de1b1..7966078 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -36,13 +36,11 @@ import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
-import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -68,8 +66,6 @@ public class TaskManagerServices {
 	private final MemoryManager memoryManager;
 	private final IOManager ioManager;
 	private final NetworkEnvironment networkEnvironment;
-	private final MetricRegistry metricRegistry;
-	private final TaskManagerMetricGroup taskManagerMetricGroup;
 	private final BroadcastVariableManager broadcastVariableManager;
 	private final FileCache fileCache;
 	private final TaskSlotTable taskSlotTable;
@@ -81,8 +77,6 @@ public class TaskManagerServices {
 		MemoryManager memoryManager,
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
-		MetricRegistry metricRegistry,
-		TaskManagerMetricGroup taskManagerMetricGroup,
 		BroadcastVariableManager broadcastVariableManager,
 		FileCache fileCache,
 		TaskSlotTable taskSlotTable,
@@ -93,8 +87,6 @@ public class TaskManagerServices {
 		this.memoryManager = Preconditions.checkNotNull(memoryManager);
 		this.ioManager = Preconditions.checkNotNull(ioManager);
 		this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
-		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
-		this.taskManagerMetricGroup = Preconditions.checkNotNull(taskManagerMetricGroup);
 		this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
 		this.fileCache = Preconditions.checkNotNull(fileCache);
 		this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
@@ -122,14 +114,6 @@ public class TaskManagerServices {
 		return taskManagerLocation;
 	}
 
-	public MetricRegistry getMetricRegistry() {
-		return metricRegistry;
-	}
-
-	public TaskManagerMetricGroup getTaskManagerMetricGroup() {
-		return taskManagerMetricGroup;
-	}
-
 	public BroadcastVariableManager getBroadcastVariableManager() {
 		return broadcastVariableManager;
 	}
@@ -181,16 +165,6 @@ public class TaskManagerServices {
 		// start the I/O manager, it will create some temp directories.
 		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
-		final MetricRegistry metricRegistry = new MetricRegistry(taskManagerServicesConfiguration.getMetricRegistryConfiguration());
-
-		final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
-			metricRegistry,
-			taskManagerLocation.getHostname(),
-			taskManagerLocation.getResourceID().toString());
-
-		// Initialize the TM metrics
-		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network);
-
 		final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
 
 		final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -214,8 +188,6 @@ public class TaskManagerServices {
 			memoryManager,
 			ioManager,
 			network,
-			metricRegistry,
-			taskManagerMetricGroup,
 			broadcastVariableManager,
 			fileCache,
 			taskSlotTable,

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 036a890..702142f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
 
@@ -58,8 +57,6 @@ public class TaskManagerServicesConfiguration {
 
 	private final float memoryFraction;
 
-	private final MetricRegistryConfiguration metricRegistryConfiguration;
-
 	public TaskManagerServicesConfiguration(
 		InetAddress taskManagerAddress,
 		String[] tmpDirPaths,
@@ -67,8 +64,7 @@ public class TaskManagerServicesConfiguration {
 		int numberOfSlots,
 		long configuredMemory,
 		boolean preAllocateMemory,
-		float memoryFraction,
-		MetricRegistryConfiguration metricRegistryConfiguration) {
+		float memoryFraction) {
 
 		this.taskManagerAddress = checkNotNull(taskManagerAddress);
 		this.tmpDirPaths = checkNotNull(tmpDirPaths);
@@ -78,8 +74,6 @@ public class TaskManagerServicesConfiguration {
 		this.configuredMemory = configuredMemory;
 		this.preAllocateMemory = preAllocateMemory;
 		this.memoryFraction = memoryFraction;
-
-		this.metricRegistryConfiguration = checkNotNull(metricRegistryConfiguration);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -113,10 +107,6 @@ public class TaskManagerServicesConfiguration {
 		return preAllocateMemory;
 	}
 
-	public MetricRegistryConfiguration getMetricRegistryConfiguration() {
-		return metricRegistryConfiguration;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Parsing of Flink configuration
 	// --------------------------------------------------------------------------------------------
@@ -171,10 +161,6 @@ public class TaskManagerServicesConfiguration {
 			ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
 			"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
 
-		final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
-
-
-
 		return new TaskManagerServicesConfiguration(
 			remoteAddress,
 			tmpDirs,
@@ -182,8 +168,7 @@ public class TaskManagerServicesConfiguration {
 			slots,
 			configuredMemory,
 			preAllocateMemory,
-			memoryFraction,
-			metricRegistryConfiguration);
+			memoryFraction);
 	}
 
 	// --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index a5e229b..a404472 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.{MetricRegistry, MetricRegistryConfiguration}
 import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.{EnvironmentInformation, LeaderRetrievalUtils}
@@ -209,7 +209,7 @@ class LocalFlinkMiniCluster(
       taskManagerServicesConfiguration,
       resourceID)
 
-    val metricRegistry = taskManagerServices.getMetricRegistry()
+    val metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config))
 
     val props = getTaskManagerProps(
       taskManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/80b6c2a0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1b56c92..993d128 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
@@ -1866,7 +1866,8 @@ object TaskManager {
       taskManagerServicesConfiguration,
       resourceID)
 
-    val metricRegistry = taskManagerServices.getMetricRegistry()
+    val metricRegistry = new FlinkMetricRegistry(
+      MetricRegistryConfiguration.fromConfiguration(configuration))
 
     val leaderRetrievalService = leaderRetrievalServiceOption match {
       case Some(lrs) => lrs