You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 22:27:43 UTC

[3/7] flink git commit: [FLINK-7876] Register TaskManagerMetricGroup under ResourceID

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
new file mode 100644
index 0000000..407fa8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
+ * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
+ */
+public class MetricRegistryImpl implements MetricRegistry {
+	static final Logger LOG = LoggerFactory.getLogger(MetricRegistryImpl.class);
+
+	private final Object lock = new Object();
+
+	private List<MetricReporter> reporters;
+	private ScheduledExecutorService executor;
+
+	@Nullable
+	private ActorRef queryService;
+
+	@Nullable
+	private String metricQueryServicePath;
+
+	private ViewUpdater viewUpdater;
+
+	private final ScopeFormats scopeFormats;
+	private final char globalDelimiter;
+	private final List<Character> delimiters = new ArrayList<>();
+
+	/**
+	 * Creates a new MetricRegistry and starts the configured reporter.
+	 */
+	public MetricRegistryImpl(MetricRegistryConfiguration config) {
+		this.scopeFormats = config.getScopeFormats();
+		this.globalDelimiter = config.getDelimiter();
+
+		// second, instantiate any custom configured reporters
+		this.reporters = new ArrayList<>();
+
+		List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();
+
+		this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
+
+		this.queryService = null;
+		this.metricQueryServicePath = null;
+
+		if (reporterConfigurations.isEmpty()) {
+			// no reporters defined
+			// by default, don't report anything
+			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
+		} else {
+			// we have some reporters so
+			for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
+				String namedReporter = reporterConfiguration.f0;
+				Configuration reporterConfig = reporterConfiguration.f1;
+
+				final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
+				if (className == null) {
+					LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
+					continue;
+				}
+
+				try {
+					String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
+					TimeUnit timeunit = TimeUnit.SECONDS;
+					long period = 10;
+
+					if (configuredPeriod != null) {
+						try {
+							String[] interval = configuredPeriod.split(" ");
+							period = Long.parseLong(interval[0]);
+							timeunit = TimeUnit.valueOf(interval[1]);
+						}
+						catch (Exception e) {
+							LOG.error("Cannot parse report interval from config: " + configuredPeriod +
+									" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
+									"Using default reporting interval.");
+						}
+					}
+
+					Class<?> reporterClass = Class.forName(className);
+					MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance();
+
+					MetricConfig metricConfig = new MetricConfig();
+					reporterConfig.addAllToProperties(metricConfig);
+					LOG.info("Configuring {} with {}.", reporterClass.getSimpleName(), metricConfig);
+					reporterInstance.open(metricConfig);
+
+					if (reporterInstance instanceof Scheduled) {
+						LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
+
+						executor.scheduleWithFixedDelay(
+								new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
+					} else {
+						LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
+					}
+					reporters.add(reporterInstance);
+
+					String delimiterForReporter = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, String.valueOf(globalDelimiter));
+					if (delimiterForReporter.length() != 1) {
+						LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
+						delimiterForReporter = String.valueOf(globalDelimiter);
+					}
+					this.delimiters.add(delimiterForReporter.charAt(0));
+				}
+				catch (Throwable t) {
+					LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Initializes the MetricQueryService.
+	 *
+	 * @param actorSystem ActorSystem to create the MetricQueryService on
+	 * @param resourceID resource ID used to disambiguate the actor name
+     */
+	public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) {
+		synchronized (lock) {
+			Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
+
+			try {
+				queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+				metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService);
+			} catch (Exception e) {
+				LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
+			}
+		}
+	}
+
+	/**
+	 * Returns the address under which the {@link MetricQueryService} is reachable.
+	 *
+	 * @return address of the metric query service
+	 */
+	@Nullable
+	public String getMetricQueryServicePath() {
+		return metricQueryServicePath;
+	}
+
+	@Override
+	public char getDelimiter() {
+		return this.globalDelimiter;
+	}
+
+	@Override
+	public char getDelimiter(int reporterIndex) {
+		try {
+			return delimiters.get(reporterIndex);
+		} catch (IndexOutOfBoundsException e) {
+			LOG.warn("Delimiter for reporter index {} not found, returning global delimiter.", reporterIndex);
+			return this.globalDelimiter;
+		}
+	}
+
+	@Override
+	public int getNumberReporters() {
+		return reporters.size();
+	}
+
+	public List<MetricReporter> getReporters() {
+		return reporters;
+	}
+
+	/**
+	 * Returns whether this registry has been shutdown.
+	 *
+	 * @return true, if this registry was shutdown, otherwise false
+	 */
+	public boolean isShutdown() {
+		synchronized (lock) {
+			return reporters == null && executor.isShutdown();
+		}
+	}
+
+	/**
+	 * Shuts down this registry and the associated {@link MetricReporter}.
+	 */
+	public void shutdown() {
+		synchronized (lock) {
+			Future<Boolean> stopFuture = null;
+			FiniteDuration stopTimeout = null;
+
+			if (queryService != null) {
+				stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
+				stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
+			}
+
+			if (reporters != null) {
+				for (MetricReporter reporter : reporters) {
+					try {
+						reporter.close();
+					} catch (Throwable t) {
+						LOG.warn("Metrics reporter did not shut down cleanly", t);
+					}
+				}
+				reporters = null;
+			}
+			shutdownExecutor();
+
+			if (stopFuture != null) {
+				boolean stopped = false;
+
+				try {
+					stopped = Await.result(stopFuture, stopTimeout);
+				} catch (Exception e) {
+					LOG.warn("Query actor did not properly stop.", e);
+				}
+
+				if (!stopped) {
+					// the query actor did not stop in time, let's kill him
+					queryService.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+			}
+		}
+	}
+
+	private void shutdownExecutor() {
+		if (executor != null) {
+			executor.shutdown();
+
+			try {
+				if (!executor.awaitTermination(1L, TimeUnit.SECONDS)) {
+					executor.shutdownNow();
+				}
+			} catch (InterruptedException e) {
+				executor.shutdownNow();
+			}
+		}
+	}
+
+	@Override
+	public ScopeFormats getScopeFormats() {
+		return scopeFormats;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Metrics (de)registration
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+		synchronized (lock) {
+			if (isShutdown()) {
+				LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
+			} else {
+				if (reporters != null) {
+					for (int i = 0; i < reporters.size(); i++) {
+						MetricReporter reporter = reporters.get(i);
+						try {
+							if (reporter != null) {
+								FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+								reporter.notifyOfAddedMetric(metric, metricName, front);
+							}
+						} catch (Exception e) {
+							LOG.warn("Error while registering metric.", e);
+						}
+					}
+				}
+				try {
+					if (queryService != null) {
+						MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
+					}
+				} catch (Exception e) {
+					LOG.warn("Error while registering metric.", e);
+				}
+				try {
+					if (metric instanceof View) {
+						if (viewUpdater == null) {
+							viewUpdater = new ViewUpdater(executor);
+						}
+						viewUpdater.notifyOfAddedView((View) metric);
+					}
+				} catch (Exception e) {
+					LOG.warn("Error while registering metric.", e);
+				}
+			}
+		}
+	}
+
+	@Override
+	public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+		synchronized (lock) {
+			if (isShutdown()) {
+				LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");
+			} else {
+				if (reporters != null) {
+					for (int i = 0; i < reporters.size(); i++) {
+						try {
+						MetricReporter reporter = reporters.get(i);
+							if (reporter != null) {
+								FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+								reporter.notifyOfRemovedMetric(metric, metricName, front);
+							}
+						} catch (Exception e) {
+							LOG.warn("Error while registering metric.", e);
+						}
+					}
+				}
+				try {
+					if (queryService != null) {
+						MetricQueryService.notifyOfRemovedMetric(queryService, metric);
+					}
+				} catch (Exception e) {
+					LOG.warn("Error while registering metric.", e);
+				}
+				try {
+					if (metric instanceof View) {
+						if (viewUpdater != null) {
+							viewUpdater.notifyOfRemovedView((View) metric);
+						}
+					}
+				} catch (Exception e) {
+					LOG.warn("Error while registering metric.", e);
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	@Nullable
+	public ActorRef getQueryService() {
+		return queryService;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This task is explicitly a static class, so that it does not hold any references to the enclosing
+	 * MetricsRegistry instance.
+	 *
+	 * <p>This is a subtle difference, but very important: With this static class, the enclosing class instance
+	 * may become garbage-collectible, whereas with an anonymous inner class, the timer thread
+	 * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer.
+	 * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible,
+	 * which acts as a fail-safe to stop the timer thread and prevents resource leaks.
+	 */
+	private static final class ReporterTask extends TimerTask {
+
+		private final Scheduled reporter;
+
+		private ReporterTask(Scheduled reporter) {
+			this.reporter = reporter;
+		}
+
+		@Override
+		public void run() {
+			try {
+				reporter.report();
+			} catch (Throwable t) {
+				LOG.warn("Error while reporting metrics", t);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index ab59977..66eace5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -106,7 +106,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 		this.registry = checkNotNull(registry);
 		this.scopeComponents = checkNotNull(scope);
 		this.parent = parent;
-		this.scopeStrings = new String[registry.getReporters() == null ? 0 : registry.getReporters().size()];
+		this.scopeStrings = new String[registry.getNumberReporters()];
 	}
 
 	public Map<String, String> getAllVariables() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index dd352bb..d4248ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -68,7 +68,7 @@ public class MiniCluster {
 	private final MiniClusterConfiguration miniClusterConfiguration;
 
 	@GuardedBy("lock") 
-	private MetricRegistry metricRegistry;
+	private MetricRegistryImpl metricRegistry;
 
 	@GuardedBy("lock")
 	private RpcService commonRpcService;
@@ -464,8 +464,8 @@ public class MiniCluster {
 	 * 
 	 * @param config The configuration of the mini cluster
 	 */
-	protected MetricRegistry createMetricRegistry(Configuration config) {
-		return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+	protected MetricRegistryImpl createMetricRegistry(Configuration config) {
+		return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 	}
 
 	/**
@@ -502,7 +502,7 @@ public class MiniCluster {
 			Configuration configuration,
 			HighAvailabilityServices haServices,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			int numResourceManagers,
 			RpcService[] resourceManagerRpcServices) throws Exception {
 
@@ -528,7 +528,7 @@ public class MiniCluster {
 	protected TaskExecutor[] startTaskManagers(
 			Configuration configuration,
 			HighAvailabilityServices haServices,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			int numTaskManagers,
 			RpcService[] taskManagerRpcServices) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 60d9a66..ca042b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
@@ -75,7 +75,7 @@ public class MiniClusterJobDispatcher {
 	private final JobManagerServices jobManagerServices;
 
 	/** Registry for all metrics in the mini cluster */
-	private final MetricRegistry metricRegistry;
+	private final MetricRegistryImpl metricRegistry;
 
 	/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
 	private final int numJobManagers;
@@ -104,7 +104,7 @@ public class MiniClusterJobDispatcher {
 			HighAvailabilityServices haServices,
 			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry) throws Exception {
+			MetricRegistryImpl metricRegistry) throws Exception {
 		this(
 			config,
 			haServices,
@@ -132,7 +132,7 @@ public class MiniClusterJobDispatcher {
 			HighAvailabilityServices haServices,
 			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			int numJobManagers,
 			RpcService[] rpcServices) throws Exception {
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index cbefe5a..90fb115 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.minicluster;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,18 +28,25 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
 
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 /**
  * Mini cluster to run the old JobManager code without embedded high availability services. This
  * class has been implemented because the normal {@link FlinkMiniCluster} has been changed to use
@@ -63,6 +67,8 @@ public class StandaloneMiniCluster {
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
+	private final MetricRegistryImpl metricRegistry;
+
 	private final FiniteDuration timeout;
 
 	private final int port;
@@ -86,21 +92,28 @@ public class StandaloneMiniCluster {
 			Executors.directExecutor(),
 			HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
+		metricRegistry = new MetricRegistryImpl(
+			MetricRegistryConfiguration.fromConfiguration(configuration));
+
 		JobManager.startJobManagerActors(
 			configuration,
 			actorSystem,
 			scheduledExecutorService,
 			scheduledExecutorService,
 			highAvailabilityServices,
+			metricRegistry,
 			Option.empty(),
 			JobManager.class,
 			MemoryArchivist.class);
 
+		final ResourceID taskManagerResourceId = ResourceID.generate();
+
 		ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
 			configuration,
-			ResourceID.generate(),
+			taskManagerResourceId,
 			actorSystem,
 			highAvailabilityServices,
+			metricRegistry,
 			LOCAL_HOSTNAME,
 			Option.<String>empty(),
 			true,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 1d4d4f3..98b80c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -118,7 +118,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
 	/** Registry to use for metrics. */
-	private final MetricRegistry metricRegistry;
+	private final MetricRegistryImpl metricRegistry;
 
 	/** Fatal error handler. */
 	private final FatalErrorHandler fatalErrorHandler;
@@ -140,7 +140,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
 			SlotManager slotManager,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 
@@ -498,8 +498,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	@Override
-	public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
-		final ArrayList<Tuple2<InstanceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
+	public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+		final ArrayList<Tuple2<ResourceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
 
 		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) {
 			final ResourceID tmResourceId = workerRegistrationEntry.getKey();
@@ -508,7 +508,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
 				MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString();
 
-			metricQueryServicePaths.add(Tuple2.of(workerRegistration.getInstanceID(), tmMetricQueryServicePath));
+			metricQueryServicePaths.add(Tuple2.of(tmResourceId, tmMetricQueryServicePath));
 		}
 
 		return CompletableFuture.completedFuture(metricQueryServicePaths);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 9eacb4b..cc2766b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -176,5 +176,5 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	 * @param timeout for the asynchronous operation
 	 * @return Future containing the collection of instance ids and the corresponding metric query service path
 	 */
-	CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+	CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index caa3ba0..361bdd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.FlinkException;
@@ -55,7 +55,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 			final RpcService rpcService,
 			final HighAvailabilityServices highAvailabilityServices,
 			final HeartbeatServices heartbeatServices,
-			final MetricRegistry metricRegistry) throws Exception {
+			final MetricRegistryImpl metricRegistry) throws Exception {
 
 		Preconditions.checkNotNull(resourceId);
 		Preconditions.checkNotNull(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 624f31d..d2b1205 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -45,7 +45,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
 			SlotManager slotManager,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 		super(

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index cf5bfcb..4d6ccd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -31,8 +31,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.WebHandler;
@@ -166,13 +166,13 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
 		//fetch TaskManager logs if no other process is currently doing it
 		if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) {
 			try {
-				InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
-				CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+				ResourceID resourceId = new ResourceID(new String(StringUtils.hexStringToByte(taskManagerID)));
+				CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, timeout);
 
 				CompletableFuture<TransientBlobKey> blobKeyFuture = taskManagerFuture.thenCompose(
 					(Optional<Instance> optTMInstance) -> {
 						Instance taskManagerInstance = optTMInstance.orElseThrow(
-							() -> new CompletionException(new FlinkException("Could not find instance with " + instanceID + '.')));
+							() -> new CompletionException(new FlinkException("Could not find instance with " + resourceId + '.')));
 						switch (fileMode) {
 							case LOG:
 								return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index ad2ee1b..84c6e41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -19,19 +19,20 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.StringUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -74,8 +75,16 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 			// return them in an array. This avoids unnecessary code complexity.
 			// If only one task manager is requested, we only fetch one task manager metrics.
 			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-				InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
-				CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+				final String unescapedString;
+
+				try {
+					unescapedString = URLDecoder.decode(pathParams.get(TASK_MANAGER_ID_KEY), "UTF-8");
+				} catch (UnsupportedEncodingException e) {
+					return FutureUtils.completedExceptionally(new FlinkException("Could not decode task manager id: " + pathParams.get(TASK_MANAGER_ID_KEY) + '.', e));
+				}
+
+				ResourceID resourceId = new ResourceID(unescapedString);
+				CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, timeout);
 
 				return tmInstanceFuture.thenApplyAsync(
 					(Optional<Instance> optTaskManager) -> {
@@ -116,7 +125,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
 		for (Instance instance : instances) {
 			gen.writeStartObject();
-			gen.writeStringField("id", instance.getId().toString());
+			gen.writeStringField("id", instance.getTaskManagerID().getResourceIdString());
 			gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
 			gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
 			gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
@@ -131,7 +140,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
 				fetcher.update();
 
-				MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+				MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getTaskManagerID().getResourceIdString());
 				if (metrics != null) {
 					gen.writeObjectFieldStart("metrics");
 					long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index 1bfb9f2..e71a1d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
@@ -145,20 +145,20 @@ public class MetricFetcher<T extends RestfulGateway> {
 				// TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery
 				// TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that
 				// TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time
-				CompletableFuture<Collection<Tuple2<InstanceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway
+				CompletableFuture<Collection<Tuple2<ResourceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway
 					.requestTaskManagerMetricQueryServicePaths(timeout);
 
 				taskManagerQueryServicePathsFuture.whenCompleteAsync(
-					(Collection<Tuple2<InstanceID, String>> queryServicePaths, Throwable throwable) -> {
+					(Collection<Tuple2<ResourceID, String>> queryServicePaths, Throwable throwable) -> {
 						if (throwable != null) {
 							LOG.warn("Requesting TaskManager's path for query services failed.", throwable);
 						} else {
 							List<String> taskManagersToRetain = queryServicePaths
 								.stream()
 								.map(
-									(Tuple2<InstanceID, String> tuple) -> {
+									(Tuple2<ResourceID, String> tuple) -> {
 										retrieveAndQueryMetrics(tuple.f1);
-										return tuple.f0.toString();
+										return tuple.f0.getResourceIdString();
 									}
 								).collect(Collectors.toList());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index cd67705..a956111 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -135,7 +135,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	private final NetworkEnvironment networkEnvironment;
 
 	/** The metric registry in the task manager */
-	private final MetricRegistry metricRegistry;
+	private final MetricRegistryImpl metricRegistry;
 
 	/** The heartbeat manager for job manager in the task manager */
 	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
@@ -179,7 +179,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			NetworkEnvironment networkEnvironment,
 			HighAvailabilityServices haServices,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			TaskManagerMetricGroup taskManagerMetricGroup,
 			BroadcastVariableManager broadcastVariableManager,
 			FileCache fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 782ab07..5a69bb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -29,22 +29,19 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -86,7 +83,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	private final MetricRegistry metricRegistry;
+	private final MetricRegistryImpl metricRegistry;
 
 	/** Executor used to run future callbacks */
 	private final ExecutorService executor;
@@ -112,7 +109,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 		HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 
-		metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+		metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
 		final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
 		metricRegistry.startQueryService(actorSystem, resourceId);
@@ -250,7 +247,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
 		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
+		MetricRegistryImpl metricRegistry,
 		boolean localCommunicationOnly,
 		FatalErrorHandler fatalErrorHandler) throws Exception {
 
@@ -269,18 +266,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 			taskManagerServicesConfiguration,
-			resourceID);
+			resourceID,
+			metricRegistry);
 
 		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 
-		TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
-			metricRegistry,
-			taskManagerServices.getTaskManagerLocation().getHostname(),
-			resourceID.toString());
-
-		// Initialize the TM metrics
-		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
-
 		return new TaskExecutor(
 			rpcService,
 			taskManagerConfiguration,
@@ -291,7 +281,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			highAvailabilityServices,
 			heartbeatServices,
 			metricRegistry,
-			taskManagerMetricGroup,
+			taskManagerServices.getTaskManagerMetricGroup(),
 			taskManagerServices.getBroadcastVariableManager(),
 			taskManagerServices.getFileCache(),
 			taskManagerServices.getTaskSlotTable(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 2baf644..85e62c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -62,7 +63,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
  * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager},
- * {@link NetworkEnvironment} and the {@link MetricRegistry}.
+ * {@link NetworkEnvironment} and the {@link MetricRegistryImpl}.
  */
 public class TaskManagerServices {
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);
@@ -72,7 +73,6 @@ public class TaskManagerServices {
 	private final MemoryManager memoryManager;
 	private final IOManager ioManager;
 	private final NetworkEnvironment networkEnvironment;
-	private final MetricRegistry metricRegistry;
 	private final TaskManagerMetricGroup taskManagerMetricGroup;
 	private final BroadcastVariableManager broadcastVariableManager;
 	private final FileCache fileCache;
@@ -85,7 +85,6 @@ public class TaskManagerServices {
 		MemoryManager memoryManager,
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
-		MetricRegistry metricRegistry,
 		TaskManagerMetricGroup taskManagerMetricGroup,
 		BroadcastVariableManager broadcastVariableManager,
 		FileCache fileCache,
@@ -97,7 +96,6 @@ public class TaskManagerServices {
 		this.memoryManager = Preconditions.checkNotNull(memoryManager);
 		this.ioManager = Preconditions.checkNotNull(ioManager);
 		this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
-		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
 		this.taskManagerMetricGroup = Preconditions.checkNotNull(taskManagerMetricGroup);
 		this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
 		this.fileCache = Preconditions.checkNotNull(fileCache);
@@ -126,10 +124,6 @@ public class TaskManagerServices {
 		return taskManagerLocation;
 	}
 
-	public MetricRegistry getMetricRegistry() {
-		return metricRegistry;
-	}
-
 	public TaskManagerMetricGroup getTaskManagerMetricGroup() {
 		return taskManagerMetricGroup;
 	}
@@ -163,12 +157,14 @@ public class TaskManagerServices {
 	 *
 	 * @param resourceID resource ID of the task manager
 	 * @param taskManagerServicesConfiguration task manager configuration
+	 * @param metricRegistry to register the TaskManagerMetricGroup
 	 * @return task manager components
 	 * @throws Exception
 	 */
 	public static TaskManagerServices fromConfiguration(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
-			ResourceID resourceID) throws Exception {
+			ResourceID resourceID,
+			MetricRegistry metricRegistry) throws Exception {
 
 		// pre-start checks
 		checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -187,9 +183,6 @@ public class TaskManagerServices {
 		// start the I/O manager, it will create some temp directories.
 		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
-		final MetricRegistry metricRegistry = new MetricRegistry(
-				taskManagerServicesConfiguration.getMetricRegistryConfiguration());
-
 		final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
 			metricRegistry,
 			taskManagerLocation.getHostname(),
@@ -223,7 +216,6 @@ public class TaskManagerServices {
 			memoryManager,
 			ioManager,
 			network,
-			metricRegistry,
 			taskManagerMetricGroup,
 			broadcastVariableManager,
 			fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index bfd37bc..990fb22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.NetUtils;
@@ -72,8 +71,6 @@ public class TaskManagerServicesConfiguration {
 
 	private final float memoryFraction;
 
-	private final MetricRegistryConfiguration metricRegistryConfiguration;
-
 	private final long timerServiceShutdownTimeout;
 
 	public TaskManagerServicesConfiguration(
@@ -85,7 +82,6 @@ public class TaskManagerServicesConfiguration {
 			long configuredMemory,
 			boolean preAllocateMemory,
 			float memoryFraction,
-			MetricRegistryConfiguration metricRegistryConfiguration,
 			long timerServiceShutdownTimeout) {
 
 		this.taskManagerAddress = checkNotNull(taskManagerAddress);
@@ -98,8 +94,6 @@ public class TaskManagerServicesConfiguration {
 		this.preAllocateMemory = preAllocateMemory;
 		this.memoryFraction = memoryFraction;
 
-		this.metricRegistryConfiguration = checkNotNull(metricRegistryConfiguration);
-
 		checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
 			"service shutdown timeout must be greater or equal to 0.");
 		this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
@@ -148,10 +142,6 @@ public class TaskManagerServicesConfiguration {
 		return preAllocateMemory;
 	}
 
-	public MetricRegistryConfiguration getMetricRegistryConfiguration() {
-		return metricRegistryConfiguration;
-	}
-
 	public long getTimerServiceShutdownTimeout() {
 		return timerServiceShutdownTimeout;
 	}
@@ -211,8 +201,6 @@ public class TaskManagerServicesConfiguration {
 			TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
 			"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
 
-		final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
-
 		long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();
 
 		return new TaskManagerServicesConfiguration(
@@ -224,7 +212,6 @@ public class TaskManagerServicesConfiguration {
 			configuredMemory,
 			preAllocateMemory,
 			memoryFraction,
-			metricRegistryConfiguration,
 			timerServiceShutdownTimeout);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index d871b06..331e96b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -95,5 +95,5 @@ public interface RestfulGateway extends RpcGateway {
 	 * @param timeout for the asynchronous operation
 	 * @return Future containing the collection of instance ids and the corresponding metric query service path
 	 */
-	CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+	CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index 74ef1de..1c573c0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -35,7 +35,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.messages.Acknowledge
 import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus}
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry}
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -74,7 +75,7 @@ abstract class ContaineredJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[FlinkMetricRegistry],
+    jobManagerMetricGroup: JobManagerMetricGroup,
     optRestAddress: Option[String])
   extends JobManager(
     flinkConfiguration,
@@ -91,7 +92,7 @@ abstract class ContaineredJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     jobRecoveryTimeout,
-    metricsRegistry,
+    jobManagerMetricGroup,
     optRestAddress) {
 
   val jobPollingInterval: FiniteDuration

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0435046..d40a0fd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -34,7 +34,6 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
 import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -50,10 +49,10 @@ import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServic
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
-import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
@@ -66,20 +65,18 @@ import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.Disconnect
 import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
+import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.util.MetricUtils
-import org.apache.flink.runtime.net.SSLUtils
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
 import org.apache.flink.runtime.taskexecutor.TaskExecutor
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -137,7 +134,7 @@ class JobManager(
     protected val submittedJobGraphs : SubmittedJobGraphStore,
     protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
     protected val jobRecoveryTimeout: FiniteDuration,
-    protected val metricsRegistry: Option[FlinkMetricRegistry],
+    protected val jobManagerMetricGroup: JobManagerMetricGroup,
     protected val optRestAddress: Option[String])
   extends FlinkActor
   with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
@@ -154,16 +151,6 @@ class JobManager(
 
   var leaderSessionID: Option[UUID] = None
 
-  protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = metricsRegistry match {
-    case Some(registry) =>
-      val host = flinkConfiguration.getString(JobManagerOptions.ADDRESS)
-      Option(new JobManagerMetricGroup(
-        registry, NetUtils.unresolvedHostToNormalizedString(host)))
-    case None =>
-      log.warn("Could not instantiate JobManager metrics.")
-      None
-  }
-
   /** Futures which have to be completed before terminating the job manager */
   var futuresToComplete: Option[Seq[Future[Unit]]] = None
 
@@ -205,12 +192,7 @@ class JobManager(
         throw new RuntimeException("Could not start the submitted job graphs service.", e)
     }
 
-    jobManagerMetricGroup match {
-      case Some(group) =>
-        instantiateMetrics(group)
-      case None =>
-        log.warn("Could not instantiate JobManager metric group.")
-    }
+    instantiateMetrics(jobManagerMetricGroup)
   }
 
   override def postStop(): Unit = {
@@ -250,6 +232,8 @@ class JobManager(
       archive ! decorateMessage(PoisonPill)
     }
 
+    jobManagerMetricGroup.close()
+
     instanceManager.shutdown()
     scheduler.shutdown()
     libraryCacheManager.shutdown()
@@ -260,13 +244,6 @@ class JobManager(
       case e: IOException => log.error("Could not properly shutdown the blob server.", e)
     }
 
-    // failsafe shutdown of the metrics registry
-    try {
-      metricsRegistry.foreach(_.shutdown())
-    } catch {
-      case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
-    }
-
     log.debug(s"Job manager ${self.path} is completely stopped.")
   }
 
@@ -1073,9 +1050,9 @@ class JobManager(
         )
       )
 
-    case RequestTaskManagerInstance(instanceID) =>
+    case RequestTaskManagerInstance(resourceId) =>
       sender ! decorateMessage(
-        TaskManagerInstance(Option(instanceManager.getRegisteredInstanceById(instanceID)))
+        TaskManagerInstance(Option(instanceManager.getRegisteredInstance(resourceId)))
       )
 
     case Heartbeat(instanceID, accumulators) =>
@@ -1283,15 +1260,7 @@ class JobManager(
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
 
-        val jobMetrics = jobManagerMetricGroup match {
-          case Some(group) =>
-            group.addJob(jobGraph) match {
-              case (jobGroup:Any) => jobGroup
-              case null => new UnregisteredMetricsGroup()
-            }
-          case None =>
-            new UnregisteredMetricsGroup()
-        }
+        val jobMetrics = jobManagerMetricGroup.addJob(jobGraph)
 
         val numSlots = scheduler.getTotalNumberOfSlots()
 
@@ -1791,7 +1760,7 @@ class JobManager(
     libraryCacheManager.unregisterJob(jobID)
     blobServer.cleanupJob(jobID)
 
-    jobManagerMetricGroup.foreach(_.removeJob(jobID))
+    jobManagerMetricGroup.removeJob(jobID)
 
     futureOption
   }
@@ -2042,7 +2011,12 @@ object JobManager {
     val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
       configuration,
       ioExecutor,
-      AddressResolution.NO_ADDRESS_RESOLUTION);
+      AddressResolution.NO_ADDRESS_RESOLUTION)
+
+    val metricRegistry = new MetricRegistryImpl(
+      MetricRegistryConfiguration.fromConfiguration(configuration))
+
+    metricRegistry.startQueryService(jobManagerSystem, null)
 
     val (_, _, webMonitorOption, _) = try {
       startJobManagerActors(
@@ -2053,6 +2027,7 @@ object JobManager {
         futureExecutor,
         ioExecutor,
         highAvailabilityServices,
+        metricRegistry,
         classOf[JobManager],
         classOf[MemoryArchivist],
         Option(classOf[StandaloneResourceManager])
@@ -2085,6 +2060,13 @@ object JobManager {
         LOG.warn("Could not properly stop the high availability services.", t)
     }
 
+    try {
+      metricRegistry.shutdown()
+    } catch {
+      case t: Throwable =>
+        LOG.warn("Could not properly shut down the metric registry.", t)
+    }
+
     FlinkExecutors.gracefulShutdown(
       timeout.toMillis,
       TimeUnit.MILLISECONDS,
@@ -2191,6 +2173,7 @@ object JobManager {
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       highAvailabilityServices: HighAvailabilityServices,
+      metricRegistry: FlinkMetricRegistry,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2231,6 +2214,7 @@ object JobManager {
         futureExecutor,
         ioExecutor,
         highAvailabilityServices,
+        metricRegistry,
         webMonitor.map(_.getRestAddress),
         jobManagerClass,
         archiveClass)
@@ -2250,11 +2234,14 @@ object JobManager {
       if (executionMode == JobManagerMode.LOCAL) {
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
 
+        val resourceId = ResourceID.generate()
+
         val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
           configuration,
-          ResourceID.generate(),
+          resourceId,
           jobManagerSystem,
           highAvailabilityServices,
+          metricRegistry,
           externalHostname,
           Some(TaskExecutor.TASK_MANAGER_NAME),
           localTaskManagerCommunication = true,
@@ -2433,7 +2420,8 @@ object JobManager {
       configuration: Configuration,
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
-      blobStore: BlobStore) :
+      blobStore: BlobStore,
+      metricRegistry: FlinkMetricRegistry) :
     (InstanceManager,
     FlinkScheduler,
     BlobServer,
@@ -2443,7 +2431,7 @@ object JobManager {
     Int, // number of archived jobs
     Option[Path], // archive path
     FiniteDuration, // timeout for job recovery
-    Option[FlinkMetricRegistry]
+    JobManagerMetricGroup
    ) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -2525,12 +2513,9 @@ object JobManager {
       }
     }
 
-    val metricRegistry = try {
-      Option(new FlinkMetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)))
-    } catch {
-      case _: Exception =>
-        None
-    }
+    val jobManagerMetricGroup = new JobManagerMetricGroup(
+      metricRegistry,
+      configuration.getString(JobManagerOptions.ADDRESS))
 
     (instanceManager,
       scheduler,
@@ -2541,7 +2526,7 @@ object JobManager {
       archiveCount,
       archivePath,
       jobRecoveryTimeout,
-      metricRegistry)
+      jobManagerMetricGroup)
   }
 
   /**
@@ -2564,6 +2549,7 @@ object JobManager {
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       highAvailabilityServices: HighAvailabilityServices,
+      metricRegistry: FlinkMetricRegistry,
       optRestAddress: Option[String],
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
@@ -2575,6 +2561,7 @@ object JobManager {
       futureExecutor,
       ioExecutor,
       highAvailabilityServices,
+      metricRegistry,
       optRestAddress,
       Some(JobMaster.JOB_MANAGER_NAME),
       Some(JobMaster.ARCHIVE_NAME),
@@ -2606,6 +2593,7 @@ object JobManager {
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       highAvailabilityServices: HighAvailabilityServices,
+      metricRegistry: FlinkMetricRegistry,
       optRestAddress: Option[String],
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
@@ -2622,11 +2610,12 @@ object JobManager {
     archiveCount,
     archivePath,
     jobRecoveryTimeout,
-    metricsRegistry) = createJobManagerComponents(
+    jobManagerMetricGroup) = createJobManagerComponents(
       configuration,
       futureExecutor,
       ioExecutor,
-      highAvailabilityServices.createBlobStore())
+      highAvailabilityServices.createBlobStore(),
+      metricRegistry)
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath)
 
@@ -2653,7 +2642,7 @@ object JobManager {
       highAvailabilityServices.getSubmittedJobGraphStore(),
       highAvailabilityServices.getCheckpointRecoveryFactory(),
       jobRecoveryTimeout,
-      metricsRegistry,
+      jobManagerMetricGroup,
       optRestAddress)
 
     val jobManager: ActorRef = jobManagerActorName match {
@@ -2661,12 +2650,6 @@ object JobManager {
       case None => actorSystem.actorOf(jobManagerProps)
     }
 
-    metricsRegistry match {
-      case Some(registry) =>
-        registry.startQueryService(actorSystem, null)
-      case None =>
-    }
-
     (jobManager, archive)
   }
 
@@ -2693,7 +2676,7 @@ object JobManager {
     submittedJobGraphStore: SubmittedJobGraphStore,
     checkpointRecoveryFactory: CheckpointRecoveryFactory,
     jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[FlinkMetricRegistry],
+    jobManagerMetricGroup: JobManagerMetricGroup,
     optRestAddress: Option[String]): Props = {
 
     Props(
@@ -2712,7 +2695,7 @@ object JobManager {
       submittedJobGraphStore,
       checkpointRecoveryFactory,
       jobRecoveryTimeout,
-      metricsRegistry,
+      jobManagerMetricGroup,
       optRestAddress)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 3e896ca..5c19c7a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -24,8 +24,9 @@ import java.util.UUID
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.blob.{PermanentBlobKey}
+import org.apache.flink.runtime.blob.PermanentBlobKey
 import org.apache.flink.runtime.client.{JobStatusMessage, SerializedJobExecutionResult}
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{Instance, InstanceID}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
@@ -419,9 +420,9 @@ object JobManagerMessages {
   /**
    * Requests the [[Instance]] object of the task manager with the given instance ID
    *
-   * @param instanceID Instance ID of the task manager
+   * @param resourceId identifying the TaskManager which shall be retrieved
    */
-  case class RequestTaskManagerInstance(instanceID: InstanceID)
+  case class RequestTaskManagerInstance(resourceId: ResourceID)
 
   /**
    * Returns the [[Instance]] object of the requested task manager. This is in response to

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index c152f4a..689d98f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl}
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -121,6 +122,9 @@ abstract class FlinkMiniCluster(
     Hardware.getNumberCPUCores(),
     new ExecutorThreadFactory("mini-cluster-io"))
 
+  protected val metricRegistry = new MetricRegistryImpl(
+    MetricRegistryConfiguration.fromConfiguration(originalConfiguration))
+
   def this(configuration: Configuration, useSingleActorSystem: Boolean) {
     this(
       configuration,
@@ -325,6 +329,10 @@ abstract class FlinkMiniCluster(
 
     lazy val singleActorSystem = startJobManagerActorSystem(0)
 
+    if (originalConfiguration.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
+      metricRegistry.startQueryService(singleActorSystem, null)
+    }
+
     val (jmActorSystems, jmActors) =
       (for(i <- 0 until numJobManagers) yield {
         val actorSystem = if(useSingleActorSystem) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index e22230e..e9bdb2a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
+import java.util.UUID
 import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.{ActorRef, ActorSystem, Props}
@@ -46,7 +47,8 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, TaskManagerMetricGroup}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl}
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
@@ -83,6 +85,12 @@ class LocalFlinkMiniCluster(
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true)
 
+  override def startInternalShutdown() {
+    metricRegistry.shutdown()
+
+    super.startInternalShutdown()
+  }
+
   // --------------------------------------------------------------------------
 
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
@@ -137,23 +145,20 @@ class LocalFlinkMiniCluster(
     }
 
     val (instanceManager,
-    scheduler,
-    blobServer,
-    libraryCacheManager,
-    restartStrategyFactory,
-    timeout,
-    archiveCount,
-    archivePath,
-    jobRecoveryTimeout,
-    metricsRegistry) = JobManager.createJobManagerComponents(
+      scheduler,
+      blobServer,
+      libraryCacheManager,
+      restartStrategyFactory,
+      timeout,
+      archiveCount,
+      archivePath,
+      jobRecoveryTimeout,
+      jobManagerMetricGroup) = JobManager.createJobManagerComponents(
       config,
       futureExecutor,
       ioExecutor,
-      highAvailabilityServices.createBlobStore())
-
-    if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
-      metricsRegistry.get.startQueryService(system, null)
-    }
+      highAvailabilityServices.createBlobStore(),
+      metricRegistry)
 
     val archive = system.actorOf(
       getArchiveProps(
@@ -180,7 +185,7 @@ class LocalFlinkMiniCluster(
         highAvailabilityServices.getSubmittedJobGraphStore(),
         highAvailabilityServices.getCheckpointRecoveryFactory(),
         jobRecoveryTimeout,
-        metricsRegistry,
+        jobManagerMetricGroup,
         optRestAddress),
       jobManagerName)
   }
@@ -248,9 +253,8 @@ class LocalFlinkMiniCluster(
 
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
-      resourceID)
-
-    val metricRegistry = taskManagerServices.getMetricRegistry()
+      resourceID,
+      metricRegistry)
 
     val props = getTaskManagerProps(
       taskManagerClass,
@@ -260,7 +264,7 @@ class LocalFlinkMiniCluster(
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment,
-      metricRegistry)
+      taskManagerServices.getTaskManagerMetricGroup)
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
       metricRegistry.startQueryService(system, resourceID)
@@ -296,7 +300,7 @@ class LocalFlinkMiniCluster(
       submittedJobGraphStore: SubmittedJobGraphStore,
       checkpointRecoveryFactory: CheckpointRecoveryFactory,
       jobRecoveryTimeout: FiniteDuration,
-      metricsRegistry: Option[MetricRegistry],
+      jobManagerMetricGroup: JobManagerMetricGroup,
       optRestAddress: Option[String])
     : Props = {
 
@@ -316,7 +320,7 @@ class LocalFlinkMiniCluster(
       submittedJobGraphStore,
       checkpointRecoveryFactory,
       jobRecoveryTimeout,
-      metricsRegistry,
+      jobManagerMetricGroup,
       optRestAddress)
   }
 
@@ -328,7 +332,7 @@ class LocalFlinkMiniCluster(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    metricsRegistry: MetricRegistry): Props = {
+    taskManagerMetricGroup: TaskManagerMetricGroup): Props = {
 
     TaskManager.getTaskManagerProps(
       taskManagerClass,
@@ -339,7 +343,7 @@ class LocalFlinkMiniCluster(
       ioManager,
       networkEnvironment,
       highAvailabilityServices,
-      metricsRegistry)
+      taskManagerMetricGroup)
   }
 
   def getResourceManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index cc01a8d..f209dac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -22,7 +22,7 @@ import java.io.{File, FileInputStream, IOException}
 import java.lang.management.ManagementFactory
 import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket}
 import java.util
-import java.util.concurrent.{Callable, TimeUnit}
+import java.util.concurrent.{Callable, TimeUnit, TimeoutException}
 import java.util.{Collections, UUID}
 
 import _root_.akka.actor._
@@ -63,8 +63,7 @@ import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
 import org.apache.flink.runtime.messages.{Acknowledge, StackTraceSampleResponse}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
-import org.apache.flink.runtime.metrics.util.MetricUtils
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
@@ -127,7 +126,7 @@ class TaskManager(
     protected val network: NetworkEnvironment,
     protected val numberOfSlots: Int,
     protected val highAvailabilityServices: HighAvailabilityServices,
-    protected val metricsRegistry: FlinkMetricRegistry)
+    protected val taskManagerMetricGroup: TaskManagerMetricGroup)
   extends FlinkActor
   with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging
   with LogMessages // Mixin order is important: first we want to support message logging
@@ -154,8 +153,6 @@ class TaskManager(
     getJobManagerLeaderRetriever(
       HighAvailabilityServices.DEFAULT_JOB_ID)
 
-  private var taskManagerMetricGroup : TaskManagerMetricGroup = _
-
   /** Actors which want to be notified once this task manager has been
     * registered at the job manager */
   private val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
@@ -258,13 +255,8 @@ class TaskManager(
     } catch {
       case t: Exception => log.error("FileCache did not shutdown properly.", t)
     }
-    
-    // failsafe shutdown of the metrics registry
-    try {
-      metricsRegistry.shutdown()
-    } catch {
-      case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
-    }
+
+    taskManagerMetricGroup.close()
 
     log.info(s"Task manager ${self.path} is completely shut down.")
   }
@@ -980,12 +972,6 @@ class TaskManager(
         throw new RuntimeException(message, e)
     }
     
-    taskManagerMetricGroup = 
-      new TaskManagerMetricGroup(metricsRegistry, location.getHostname, id.toString)
-    
-    MetricUtils.instantiateStatusMetrics(taskManagerMetricGroup)
-    MetricUtils.instantiateNetworkMetrics(taskManagerMetricGroup, network)
-    
     // watch job manager to detect when it dies
     context.watch(jobManager)
 
@@ -1832,15 +1818,22 @@ object TaskManager {
       actorSystemPort,
       LOG.logger)
 
+    val metricRegistry = new MetricRegistryImpl(
+      MetricRegistryConfiguration.fromConfiguration(configuration))
+
+    metricRegistry.startQueryService(taskManagerSystem, resourceID)
+
     // start all the TaskManager services (network stack,  library cache, ...)
     // and the TaskManager actor
     try {
+
       LOG.info("Starting TaskManager actor")
       val taskManager = startTaskManagerComponentsAndActor(
         configuration,
         resourceID,
         taskManagerSystem,
         highAvailabilityServices,
+        metricRegistry,
         taskManagerHostname,
         Some(TaskExecutor.TASK_MANAGER_NAME),
         localTaskManagerCommunication = false,
@@ -1893,6 +1886,9 @@ object TaskManager {
         }
         throw t
     }
+
+    // shut down the metric query service
+    metricRegistry.shutdown()
   }
 
   /**
@@ -1984,6 +1980,7 @@ object TaskManager {
       resourceID: ResourceID,
       actorSystem: ActorSystem,
       highAvailabilityServices: HighAvailabilityServices,
+      metricRegistry: FlinkMetricRegistry,
       taskManagerHostname: String,
       taskManagerActorName: Option[String],
       localTaskManagerCommunication: Boolean,
@@ -1999,9 +1996,8 @@ object TaskManager {
 
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
-      resourceID)
-
-    val metricRegistry = taskManagerServices.getMetricRegistry()
+      resourceID,
+      metricRegistry)
 
     // create the actor properties (which define the actor constructor parameters)
     val tmProps = getTaskManagerProps(
@@ -2013,9 +2009,7 @@ object TaskManager {
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment(),
       highAvailabilityServices,
-      metricRegistry)
-
-    metricRegistry.startQueryService(actorSystem, resourceID)
+      taskManagerServices.getTaskManagerMetricGroup)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -2032,7 +2026,7 @@ object TaskManager {
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
     highAvailabilityServices: HighAvailabilityServices,
-    metricsRegistry: FlinkMetricRegistry
+    taskManagerMetricGroup: TaskManagerMetricGroup
   ): Props = {
     Props(
       taskManagerClass,
@@ -2044,7 +2038,7 @@ object TaskManager {
       networkEnvironment,
       taskManagerConfig.getNumberSlots(),
       highAvailabilityServices,
-      metricsRegistry)
+      taskManagerMetricGroup)
   }
 
   // --------------------------------------------------------------------------