You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/09/15 17:18:39 UTC

[2/4] flink git commit: [FLINK-4389] Expose metrics to WebFrontend

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
new file mode 100644
index 0000000..143faaf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+
+/**
+ * Utility class for the serialization of metrics.
+ */
+public class MetricDumpSerialization {
+	private static final Logger LOG = LoggerFactory.getLogger(MetricDumpSerialization.class);
+
+	private MetricDumpSerialization() {
+	}
+
+	//-------------------------------------------------------------------------
+	// Serialization
+	//-------------------------------------------------------------------------
+	public static class MetricDumpSerializer {
+		private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+		private DataOutputStream dos = new DataOutputStream(baos);
+
+		/**
+		 * Serializes the given metrics and returns the resulting byte array.
+		 *
+		 * @param counters   counters to serialize
+		 * @param gauges     gauges to serialize
+		 * @param histograms histograms to serialize
+		 * @return byte array containing the serialized metrics
+		 * @throws IOException
+		 */
+		public byte[] serialize(
+			Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
+			Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
+			Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
+			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
+				
+			baos.reset();
+			dos.writeInt(counters.size());
+			dos.writeInt(gauges.size());
+			dos.writeInt(histograms.size());
+			dos.writeInt(meters.size());
+
+			for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
+				serializeMetricInfo(dos, entry.getValue().f0);
+				serializeString(dos, entry.getValue().f1);
+				serializeCounter(dos, entry.getKey());
+			}
+
+			for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
+				serializeMetricInfo(dos, entry.getValue().f0);
+				serializeString(dos, entry.getValue().f1);
+				serializeGauge(dos, entry.getKey());
+			}
+
+			for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
+				serializeMetricInfo(dos, entry.getValue().f0);
+				serializeString(dos, entry.getValue().f1);
+				serializeHistogram(dos, entry.getKey());
+			}
+
+			for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
+				serializeMetricInfo(dos, entry.getValue().f0);
+				serializeString(dos, entry.getValue().f1);
+				serializeMeter(dos, entry.getKey());
+			}
+			return baos.toByteArray();
+		}
+
+		public void close() {
+			try {
+				dos.close();
+			} catch (Exception e) {
+				LOG.debug("Failed to close OutputStream.", e);
+			}
+			try {
+				baos.close();
+			} catch (Exception e) {
+				LOG.debug("Failed to close OutputStream.", e);
+			}
+		}
+	}
+
+	private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException {
+		serializeString(dos, info.scope);
+		dos.writeByte(info.getCategory());
+		switch (info.getCategory()) {
+			case INFO_CATEGORY_JM:
+				break;
+			case INFO_CATEGORY_TM:
+				String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+				serializeString(dos, tmID);
+				break;
+			case INFO_CATEGORY_JOB:
+				QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+				serializeString(dos, jobInfo.jobID);
+				break;
+			case INFO_CATEGORY_TASK:
+				QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
+				serializeString(dos, taskInfo.jobID);
+				serializeString(dos, taskInfo.vertexID);
+				dos.writeInt(taskInfo.subtaskIndex);
+				break;
+			case INFO_CATEGORY_OPERATOR:
+				QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
+				serializeString(dos, operatorInfo.jobID);
+				serializeString(dos, operatorInfo.vertexID);
+				dos.writeInt(operatorInfo.subtaskIndex);
+				serializeString(dos, operatorInfo.operatorName);
+				break;
+		}
+	}
+
+	private static void serializeString(DataOutputStream dos, String string) throws IOException {
+		byte[] bytes = string.getBytes();
+		dos.writeInt(bytes.length);
+		dos.write(bytes);
+	}
+
+	private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException {
+		dos.writeLong(counter.getCount());
+	}
+
+	private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException {
+		serializeString(dos, gauge.getValue().toString());
+	}
+
+	private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException {
+		HistogramStatistics stat = histogram.getStatistics();
+
+		dos.writeLong(stat.getMin());
+		dos.writeLong(stat.getMax());
+		dos.writeDouble(stat.getMean());
+		dos.writeDouble(stat.getQuantile(0.5));
+		dos.writeDouble(stat.getStdDev());
+		dos.writeDouble(stat.getQuantile(0.75));
+		dos.writeDouble(stat.getQuantile(0.90));
+		dos.writeDouble(stat.getQuantile(0.95));
+		dos.writeDouble(stat.getQuantile(0.98));
+		dos.writeDouble(stat.getQuantile(0.99));
+		dos.writeDouble(stat.getQuantile(0.999));
+	}
+
+	private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOException {
+		dos.writeDouble(meter.getRate());
+	}
+
+	//-------------------------------------------------------------------------
+	// Deserialization
+	//-------------------------------------------------------------------------
+	public static class MetricDumpDeserializer {
+		/**
+		 * De-serializes metrics from the given byte array and returns them as a list of {@link MetricDump}.
+		 *
+		 * @param data serialized metrics
+		 * @return A list containing the deserialized metrics.
+		 * @throws IOException
+		 */
+		public List<MetricDump> deserialize(byte[] data) throws IOException {
+			ByteArrayInputStream bais = new ByteArrayInputStream(data);
+			DataInputStream dis = new DataInputStream(bais);
+
+			int numCounters = dis.readInt();
+			int numGauges = dis.readInt();
+			int numHistograms = dis.readInt();
+			int numMeters = dis.readInt();
+
+			List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
+
+			for (int x = 0; x < numCounters; x++) {
+				metrics.add(deserializeCounter(dis));
+			}
+
+			for (int x = 0; x < numGauges; x++) {
+				metrics.add(deserializeGauge(dis));
+			}
+
+			for (int x = 0; x < numHistograms; x++) {
+				metrics.add(deserializeHistogram(dis));
+			}
+
+			for (int x = 0; x < numMeters; x++) {
+				metrics.add(deserializeMeter(dis));
+			}
+
+			return metrics;
+		}
+	}
+
+	private static String deserializeString(DataInputStream dis) throws IOException {
+		int stringLength = dis.readInt();
+		byte[] bytes = new byte[stringLength];
+		dis.readFully(bytes);
+		return new String(bytes);
+	}
+
+	private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException {
+		QueryScopeInfo scope = deserializeMetricInfo(dis);
+		String name = deserializeString(dis);
+		return new MetricDump.CounterDump(scope, name, dis.readLong());
+	}
+
+	private static MetricDump.GaugeDump deserializeGauge(DataInputStream dis) throws IOException {
+		QueryScopeInfo scope = deserializeMetricInfo(dis);
+		String name = deserializeString(dis);
+		String value = deserializeString(dis);
+		return new MetricDump.GaugeDump(scope, name, value);
+	}
+
+	private static MetricDump.HistogramDump deserializeHistogram(DataInputStream dis) throws IOException {
+		QueryScopeInfo info = deserializeMetricInfo(dis);
+		String name = deserializeString(dis);
+		long min = dis.readLong();
+		long max = dis.readLong();
+		double mean = dis.readDouble();
+		double median = dis.readDouble();
+		double stddev = dis.readDouble();
+		double p75 = dis.readDouble();
+		double p90 = dis.readDouble();
+		double p95 = dis.readDouble();
+		double p98 = dis.readDouble();
+		double p99 = dis.readDouble();
+		double p999 = dis.readDouble();
+		return new MetricDump.HistogramDump(info, name, min, max, mean, median, stddev, p75, p90, p95, p98, p99, p999);
+	}
+
+	private static MetricDump.MeterDump deserializeMeter(DataInputStream dis) throws IOException {
+		QueryScopeInfo info = deserializeMetricInfo(dis);
+		String name = deserializeString(dis);
+		double rate = dis.readDouble();
+		return new MetricDump.MeterDump(info, name, rate);
+	}
+
+	private static QueryScopeInfo deserializeMetricInfo(DataInputStream dis) throws IOException {
+		String jobID;
+		String vertexID;
+		int subtaskIndex;
+
+		String scope = deserializeString(dis);
+		byte cat = dis.readByte();
+		switch (cat) {
+			case INFO_CATEGORY_JM:
+				return new QueryScopeInfo.JobManagerQueryScopeInfo(scope);
+			case INFO_CATEGORY_TM:
+				String tmID = deserializeString(dis);
+				return new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID, scope);
+			case INFO_CATEGORY_JOB:
+				jobID = deserializeString(dis);
+				return new QueryScopeInfo.JobQueryScopeInfo(jobID, scope);
+			case INFO_CATEGORY_TASK:
+				jobID = deserializeString(dis);
+				vertexID = deserializeString(dis);
+				subtaskIndex = dis.readInt();
+				return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope);
+			case INFO_CATEGORY_OPERATOR:
+				jobID = deserializeString(dis);
+				vertexID = deserializeString(dis);
+				subtaskIndex = dis.readInt();
+				String operatorName = deserializeString(dis);
+				return new QueryScopeInfo.OperatorQueryScopeInfo(jobID, vertexID, subtaskIndex, operatorName, scope);
+			default:
+				throw new IOException("sup");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
new file mode 100644
index 0000000..6e0b443
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpSerializer;
+
+/**
+ * The MetricQueryService creates a key-value representation of all metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and can be notified of
+ * - an added metric by calling {@link MetricQueryService#notifyOfAddedMetric(ActorRef, Metric, String, AbstractMetricGroup)}
+ * - a removed metric by calling {@link MetricQueryService#notifyOfRemovedMetric(ActorRef, Metric)}
+ * - a metric dump request by sending the return value of {@link MetricQueryService#getCreateDump()}
+ */
+public class MetricQueryService extends UntypedActor {
+	private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);
+
+	public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
+
+	private static final CharacterFilter FILTER = new CharacterFilter() {
+		@Override
+		public String filterCharacters(String input) {
+			return replaceInvalidChars(input);
+		}
+	};
+
+	private final MetricDumpSerializer serializer = new MetricDumpSerializer();
+
+	private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
+	private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
+	private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
+	private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
+
+	@Override
+	public void postStop() {
+		serializer.close();
+	}
+
+	@Override
+	public void onReceive(Object message) {
+		try {
+			if (message instanceof AddMetric) {
+				AddMetric added = (AddMetric) message;
+
+				String metricName = added.metricName;
+				Metric metric = added.metric;
+				AbstractMetricGroup group = added.group;
+
+				QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER);
+
+				if (metric instanceof Counter) {
+					counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+				} else if (metric instanceof Gauge) {
+					gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+				} else if (metric instanceof Histogram) {
+					histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+				} else if (metric instanceof Meter) {
+					meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+				}
+			} else if (message instanceof RemoveMetric) {
+				Metric metric = (((RemoveMetric) message).metric);
+				if (metric instanceof Counter) {
+					this.counters.remove(metric);
+				} else if (metric instanceof Gauge) {
+					this.gauges.remove(metric);
+				} else if (metric instanceof Histogram) {
+					this.histograms.remove(metric);
+				} else if (metric instanceof Meter) {
+					this.meters.remove(metric);
+				}
+			} else if (message instanceof CreateDump) {
+				byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
+				getSender().tell(dump, getSelf());
+			} else {
+				LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
+				getSender().tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), getSelf());
+			}
+		} catch (Exception e) {
+			LOG.warn("An exception occurred while processing a message.", e);
+		}
+	}
+
+	/**
+	 * Lightweight method to replace unsupported characters.
+	 * If the string does not contain any unsupported characters, this method creates no
+	 * new string (and in fact no new objects at all).
+	 *
+	 * <p>Replacements:
+	 *
+	 * <ul>
+	 *     <li>{@code space : . ,} are replaced by {@code _} (underscore)</li>
+	 * </ul>
+	 */
+	static String replaceInvalidChars(String str) {
+		char[] chars = null;
+		final int strLen = str.length();
+		int pos = 0;
+
+		for (int i = 0; i < strLen; i++) {
+			final char c = str.charAt(i);
+			switch (c) {
+				case ' ':
+				case '.':
+				case ':':
+				case ',':
+					if (chars == null) {
+						chars = str.toCharArray();
+					}
+					chars[pos++] = '_';
+					break;
+				default:
+					if (chars != null) {
+						chars[pos] = c;
+					}
+					pos++;
+			}
+		}
+
+		return chars == null ? str : new String(chars, 0, pos);
+	}
+
+	/**
+	 * Starts the MetricQueryService actor in the given actor system.
+	 *
+	 * @param actorSystem The actor system running the MetricQueryService
+	 * @return actor reference to the MetricQueryService
+	 */
+	public static ActorRef startMetricQueryService(ActorSystem actorSystem) {
+		return actorSystem.actorOf(Props.create(MetricQueryService.class), METRIC_QUERY_SERVICE_NAME);
+	}
+
+	/**
+	 * Utility method to notify a MetricQueryService of an added metric.
+	 *
+	 * @param service    MetricQueryService to notify
+	 * @param metric     added metric
+	 * @param metricName metric name
+	 * @param group      group the metric was added on
+	 */
+	public static void notifyOfAddedMetric(ActorRef service, Metric metric, String metricName, AbstractMetricGroup group) {
+		service.tell(new AddMetric(metricName, metric, group), null);
+	}
+
+	/**
+	 * Utility method to notify a MetricQueryService of a removed metric.
+	 *
+	 * @param service MetricQueryService to notify
+	 * @param metric  removed metric
+	 */
+	public static void notifyOfRemovedMetric(ActorRef service, Metric metric) {
+		service.tell(new RemoveMetric(metric), null);
+	}
+
+	private static class AddMetric {
+		private final String metricName;
+		private final Metric metric;
+		private final AbstractMetricGroup group;
+
+		private AddMetric(String metricName, Metric metric, AbstractMetricGroup group) {
+			this.metricName = metricName;
+			this.metric = metric;
+			this.group = group;
+		}
+	}
+
+	private static class RemoveMetric {
+		private final Metric metric;
+
+		private RemoveMetric(Metric metric) {
+			this.metric = metric;
+		}
+	}
+
+	public static Object getCreateDump() {
+		return CreateDump.INSTANCE;
+	}
+
+	private static class CreateDump implements Serializable {
+		private static CreateDump INSTANCE = new CreateDump();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
new file mode 100644
index 0000000..df5c2bf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+/**
+ * Container for scope related information as required by the MetricQueryService.
+ */
+public abstract class QueryScopeInfo {
+	/** Categories to be returned by {@link QueryScopeInfo#getCategory()} to avoid instanceof checks. */
+	public static final byte INFO_CATEGORY_JM = 0;
+	public static final byte INFO_CATEGORY_TM = 1;
+	public static final byte INFO_CATEGORY_JOB = 2;
+	public static final byte INFO_CATEGORY_TASK = 3;
+	public static final byte INFO_CATEGORY_OPERATOR = 4;
+
+	/** The remaining scope not covered by specific fields */
+	public final String scope;
+
+	private QueryScopeInfo(String scope) {
+		this.scope = scope;
+	}
+
+	/**
+	 * Create a copy of this QueryScopeInfo and append the given scope.
+	 *
+	 * @param userScope scope to append
+	 * @return modified copy of this QueryScopeInfo
+	 */
+	public abstract QueryScopeInfo copy(String userScope);
+
+	/**
+	 * Returns the category for this QueryScopeInfo.
+	 * 
+	 * @return category
+     */
+	public abstract byte getCategory();
+
+	/**
+	 * Container for the job manager scope. Stores no additional information.
+     */
+	public static class JobManagerQueryScopeInfo extends QueryScopeInfo {
+		public JobManagerQueryScopeInfo() {
+			super("");
+		}
+
+		public JobManagerQueryScopeInfo(String scope) {
+			super(scope);
+		}
+
+		@Override
+		public JobManagerQueryScopeInfo copy(String additionalScope) {
+			return new JobManagerQueryScopeInfo(this.scope + additionalScope);
+		}
+
+		@Override
+		public byte getCategory() {
+			return INFO_CATEGORY_JM;
+		}
+	}
+
+	/**
+	 * Container for the task manager scope. Stores the ID of the task manager.
+     */
+	public static class TaskManagerQueryScopeInfo extends QueryScopeInfo {
+		public final String taskManagerID;
+
+		public TaskManagerQueryScopeInfo(String taskManagerId) {
+			this(taskManagerId, "");
+		}
+
+		public TaskManagerQueryScopeInfo(String taskManagerId, String scope) {
+			super(scope);
+			this.taskManagerID = taskManagerId;
+		}
+
+		@Override
+		public TaskManagerQueryScopeInfo copy(String additionalScope) {
+			return new TaskManagerQueryScopeInfo(this.taskManagerID, this.scope + additionalScope);
+		}
+
+		@Override
+		public byte getCategory() {
+			return INFO_CATEGORY_TM;
+		}
+	}
+
+	/**
+	 * Container for the job scope. Stores the ID of the job.
+     */
+	public static class JobQueryScopeInfo extends QueryScopeInfo {
+		public final String jobID;
+
+		public JobQueryScopeInfo(String jobID) {
+			this(jobID, "");
+		}
+
+		public JobQueryScopeInfo(String jobID, String scope) {
+			super(scope);
+			this.jobID = jobID;
+		}
+
+		@Override
+		public JobQueryScopeInfo copy(String additionalScope) {
+			return new JobQueryScopeInfo(this.jobID, this.scope + additionalScope);
+		}
+
+		@Override
+		public byte getCategory() {
+			return INFO_CATEGORY_JOB;
+		}
+	}
+
+	/**
+	 * Container for the task scope. Stores the ID of the job/vertex and subtask index.
+     */
+	public static class TaskQueryScopeInfo extends QueryScopeInfo {
+		public final String jobID;
+		public final String vertexID;
+		public final int subtaskIndex;
+
+		public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex) {
+			this(jobID, vertexid, subtaskIndex, "");
+		}
+
+		public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String scope) {
+			super(scope);
+			this.jobID = jobID;
+			this.vertexID = vertexid;
+			this.subtaskIndex = subtaskIndex;
+		}
+
+		@Override
+		public TaskQueryScopeInfo copy(String additionalScope) {
+			return new TaskQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.scope + additionalScope);
+		}
+
+		@Override
+		public byte getCategory() {
+			return INFO_CATEGORY_TASK;
+		}
+	}
+
+	/**
+	 * Container for the operator scope. Stores the ID of the job/vertex, the subtask index and the name of the operator.
+     */
+	public static class OperatorQueryScopeInfo extends QueryScopeInfo {
+		public final String jobID;
+		public final String vertexID;
+		public final int subtaskIndex;
+		public final String operatorName;
+
+		public OperatorQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String operatorName) {
+			this(jobID, vertexid, subtaskIndex, operatorName, "");
+		}
+
+		public OperatorQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String operatorName, String scope) {
+			super(scope);
+			this.jobID = jobID;
+			this.vertexID = vertexid;
+			this.subtaskIndex = subtaskIndex;
+			this.operatorName = operatorName;
+		}
+
+		@Override
+		public OperatorQueryScopeInfo copy(String additionalScope) {
+			return new OperatorQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.operatorName, this.scope + additionalScope);
+		}
+
+		@Override
+		public byte getCategory() {
+			return INFO_CATEGORY_OPERATOR;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 89fe3cd..75476a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -27,6 +27,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,6 +87,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	 * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
 	private String scopeString;
 
+	/** The metrics query service scope represented by this group, lazily computed. */
+	protected QueryScopeInfo queryServiceScopeInfo;
+
 	/** Flag indicating whether this group has been closed */
 	private volatile boolean closed;
 
@@ -123,6 +127,27 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	}
 
 	/**
+	 * Returns the metric query service scope for this group.
+	 * 
+	 * @param filter character filter
+	 * @return query service scope
+     */
+	public QueryScopeInfo getQueryServiceMetricInfo(CharacterFilter filter) {
+		if (queryServiceScopeInfo == null) {
+			queryServiceScopeInfo = createQueryServiceMetricInfo(filter);
+		}
+		return queryServiceScopeInfo;
+	}
+
+	/**
+	 * Creates the metric query service scope for this group.
+	 *
+	 * @param filter character filter
+	 * @return query service scope
+     */
+	protected abstract QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter);
+
+	/**
 	 * Returns the fully qualified metric name, for example
 	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
index 569ad0f..ab6418d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
@@ -18,16 +18,26 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
 /**
  * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to hold
  * subgroups of metrics.
  */
 public class GenericMetricGroup extends AbstractMetricGroup<AbstractMetricGroup<?>> {
+	/** The name of this group */
+	private String name;
 
 	public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {
 		super(registry, makeScopeComponents(parent, name), parent);
+		this.name = name;
+	}
+
+	@Override
+	protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+		return parent.getQueryServiceMetricInfo(filter).copy(filter.filterCharacters(this.name));
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index da0d8f8..2f6b07a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -18,8 +18,10 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import java.util.HashMap;
@@ -46,6 +48,11 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric
 		return hostname;
 	}
 
+	@Override
+	protected QueryScopeInfo.JobManagerQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+		return new QueryScopeInfo.JobManagerQueryScopeInfo();
+	}
+
 	// ------------------------------------------------------------------------
 	//  job groups
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
index e101d2f..091807f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import javax.annotation.Nullable;
@@ -65,6 +67,11 @@ public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends
 		return jobName;
 	}
 
+	@Override
+	protected QueryScopeInfo.JobQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+		return new QueryScopeInfo.JobQueryScopeInfo(this.jobId.toString());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Component Metric Group Specifics
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
index 9352faf..1ed55d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import java.util.Collections;
@@ -42,6 +44,15 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
 	public final TaskMetricGroup parent() {
 		return parent;
 	}
+
+	@Override
+	protected QueryScopeInfo.OperatorQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+		return new QueryScopeInfo.OperatorQueryScopeInfo(
+			this.parent.parent.jobId.toString(),
+			this.parent.vertexId.toString(),
+			this.parent.subtaskIndex,
+			filter.filterCharacters(this.operatorName));
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Component Metric Group Specifics

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index 8f81cfd..5bdd014 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -19,8 +19,10 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import java.util.HashMap;
@@ -55,6 +57,11 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr
 		return taskManagerId;
 	}
 
+	@Override
+	protected QueryScopeInfo.TaskManagerQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+		return new QueryScopeInfo.TaskManagerQueryScopeInfo(this.taskManagerId);
+	}
+
 	// ------------------------------------------------------------------------
 	//  job groups
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 78fec97..0e76ab0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.util.AbstractID;
 
@@ -43,12 +45,12 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 	private final AbstractID executionId;
 
 	@Nullable
-	private final AbstractID vertexId;
+	protected final AbstractID vertexId;
 	
 	@Nullable
 	private final String taskName;
 
-	private final int subtaskIndex;
+	protected final int subtaskIndex;
 
 	private final int attemptNumber;
 
@@ -113,6 +115,14 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 		return ioMetrics;
 	}
 
+	@Override
+	protected QueryScopeInfo.TaskQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+		return new QueryScopeInfo.TaskQueryScopeInfo(
+			this.parent.jobId.toString(),
+			this.vertexId.toString(),
+			this.subtaskIndex);
+	}
+
 	// ------------------------------------------------------------------------
 	//  operators and cleanup
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f67be0e..1c68874 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2750,6 +2750,12 @@ object JobManager {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
       case None => actorSystem.actorOf(jobManagerProps)
     }
+    
+    metricsRegistry match {
+      case Some(registry) =>
+        registry.startQueryService(actorSystem)
+      case None =>
+    }
 
     (jobManager, archive)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index cac5d91..27c9dd9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -203,7 +203,8 @@ class LocalFlinkMiniCluster(
     memoryManager,
     ioManager,
     network,
-    leaderRetrievalService) = TaskManager.createTaskManagerComponents(
+    leaderRetrievalService,
+    metricsRegistry) = TaskManager.createTaskManagerComponents(
       config,
       resourceID,
       hostname, // network interface to bind to
@@ -218,7 +219,10 @@ class LocalFlinkMiniCluster(
       memoryManager,
       ioManager,
       network,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
+
+    metricsRegistry.startQueryService(system)
 
     system.actorOf(props, taskManagerActorName)
   }
@@ -274,7 +278,8 @@ class LocalFlinkMiniCluster(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService): Props = {
+    leaderRetrievalService: LeaderRetrievalService,
+    metricsRegistry: MetricRegistry): Props = {
 
     TaskManager.getTaskManagerProps(
       taskManagerClass,
@@ -284,7 +289,8 @@ class LocalFlinkMiniCluster(
       memoryManager,
       ioManager,
       networkEnvironment,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
   }
 
   def getResourceManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8ebdd80..c882631 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -132,7 +132,8 @@ class TaskManager(
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
     protected val numberOfSlots: Int,
-    protected val leaderRetrievalService: LeaderRetrievalService)
+    protected val leaderRetrievalService: LeaderRetrievalService,
+    protected val metricsRegistry: FlinkMetricRegistry)
   extends FlinkActor
   with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging
   with LogMessages // Mixin order is important: first we want to support message logging
@@ -158,7 +159,6 @@ class TaskManager(
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
 
-  private var metricsRegistry : FlinkMetricRegistry = _
   private var taskManagerMetricGroup : TaskManagerMetricGroup = _
 
   /** Metric serialization */
@@ -276,11 +276,7 @@ class TaskManager(
     
     // failsafe shutdown of the metrics registry
     try {
-      val reg = metricsRegistry
-      metricsRegistry = null
-      if (reg != null) {
-        reg.shutdown()
-      }
+      metricsRegistry.shutdown()
     } catch {
       case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
     }
@@ -985,8 +981,6 @@ class TaskManager(
     else {
       libraryCacheManager = Some(new FallbackLibraryCacheManager)
     }
-
-    metricsRegistry = new FlinkMetricRegistry(config.configuration)
     
     taskManagerMetricGroup = 
       new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
@@ -1064,9 +1058,12 @@ class TaskManager(
       network.getKvStateRegistry.unregisterListener()
     }
     
-    // stop the metrics reporters
-    metricsRegistry.shutdown()
-    metricsRegistry = null
+    // failsafe shutdown of the metrics registry
+    try {
+      metricsRegistry.shutdown()
+    } catch {
+      case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
+    }
   }
 
   protected def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): Unit = {
@@ -1849,7 +1846,8 @@ object TaskManager {
       memoryManager,
       ioManager,
       network,
-      leaderRetrievalService) = createTaskManagerComponents(
+      leaderRetrievalService,
+      metricsRegistry) = createTaskManagerComponents(
       configuration,
       resourceID,
       taskManagerHostname,
@@ -1865,7 +1863,10 @@ object TaskManager {
       memoryManager,
       ioManager,
       network,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
+
+    metricsRegistry.startQueryService(actorSystem)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -1881,7 +1882,8 @@ object TaskManager {
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService
+    leaderRetrievalService: LeaderRetrievalService,
+    metricsRegistry: FlinkMetricRegistry
   ): Props = {
     Props(
       taskManagerClass,
@@ -1892,7 +1894,8 @@ object TaskManager {
       ioManager,
       networkEnvironment,
       taskManagerConfig.numberOfSlots,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
   }
 
   def createTaskManagerComponents(
@@ -1906,7 +1909,8 @@ object TaskManager {
       MemoryManager,
       IOManager,
       NetworkEnvironment,
-      LeaderRetrievalService) = {
+      LeaderRetrievalService,
+      FlinkMetricRegistry) = {
 
     val (taskManagerConfig : TaskManagerConfiguration,
     netConfig: NetworkEnvironmentConfiguration,
@@ -2081,12 +2085,15 @@ object TaskManager {
       case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
     }
 
+    val metricsRegistry = new FlinkMetricRegistry(configuration)
+
     (taskManagerConfig,
       taskManagerLocation,
       memoryManager,
       ioManager,
       network,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
new file mode 100644
index 0000000..bc0f005
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class MetricDumpSerializerTest {
+	@Test
+	public void testSerialization() throws IOException {
+		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+		MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
+
+		Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
+		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
+		Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
+		Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
+
+		SimpleCounter c1 = new SimpleCounter();
+		SimpleCounter c2 = new SimpleCounter();
+		SimpleCounter c3 = new SimpleCounter();
+
+		c1.inc(1);
+		c2.inc(2);
+
+		Gauge<Integer> g1 = new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return 4;
+			}
+		};
+
+		Histogram h1 = new TestingHistogram();
+
+		Meter m1 = new Meter() {
+			@Override
+			public void markEvent() {
+			}
+
+			@Override
+			public void markEvent(long n) {
+			}
+
+			@Override
+			public double getRate() {
+				return 5;
+			}
+
+			@Override
+			public long getCount() {
+				return 10;
+			}
+		};
+
+		counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "c1"));
+		counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "B"), "c2"));
+		meters.put(m1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo("jid", "C"), "c3"));
+		gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
+		histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
+
+		byte[] serialized = serializer.serialize(counters, gauges, histograms, meters);
+		List<MetricDump> deserialized = deserializer.deserialize(serialized);
+
+		// ===== Counters ==============================================================================================
+		assertEquals(5, deserialized.size());
+
+		for (MetricDump metric : deserialized) {
+			switch (metric.getCategory()) {
+				case METRIC_CATEGORY_COUNTER:
+					MetricDump.CounterDump counterDump = (MetricDump.CounterDump) metric;
+					switch ((byte) counterDump.count) {
+						case 1:
+							assertTrue(counterDump.scopeInfo instanceof QueryScopeInfo.JobManagerQueryScopeInfo);
+							assertEquals("A", counterDump.scopeInfo.scope);
+							assertEquals("c1", counterDump.name);
+							counters.remove(c1);
+							break;
+						case 2:
+							assertTrue(counterDump.scopeInfo instanceof QueryScopeInfo.TaskManagerQueryScopeInfo);
+							assertEquals("B", counterDump.scopeInfo.scope);
+							assertEquals("c2", counterDump.name);
+							assertEquals("tmid", ((QueryScopeInfo.TaskManagerQueryScopeInfo) counterDump.scopeInfo).taskManagerID);
+							counters.remove(c2);
+							break;
+						default:
+							fail();
+					}
+					break;
+				case METRIC_CATEGORY_GAUGE:
+					MetricDump.GaugeDump gaugeDump = (MetricDump.GaugeDump) metric;
+					assertEquals("4", gaugeDump.value);
+					assertEquals("g1", gaugeDump.name);
+
+					assertTrue(gaugeDump.scopeInfo instanceof QueryScopeInfo.TaskQueryScopeInfo);
+					QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) gaugeDump.scopeInfo;
+					assertEquals("D", taskInfo.scope);
+					assertEquals("jid", taskInfo.jobID);
+					assertEquals("vid", taskInfo.vertexID);
+					assertEquals(2, taskInfo.subtaskIndex);
+					gauges.remove(g1);
+					break;
+				case METRIC_CATEGORY_HISTOGRAM:
+					MetricDump.HistogramDump histogramDump = (MetricDump.HistogramDump) metric;
+					assertEquals("h1", histogramDump.name);
+					assertEquals(0.5, histogramDump.median, 0.1);
+					assertEquals(0.75, histogramDump.p75, 0.1);
+					assertEquals(0.90, histogramDump.p90, 0.1);
+					assertEquals(0.95, histogramDump.p95, 0.1);
+					assertEquals(0.98, histogramDump.p98, 0.1);
+					assertEquals(0.99, histogramDump.p99, 0.1);
+					assertEquals(0.999, histogramDump.p999, 0.1);
+					assertEquals(4, histogramDump.mean, 0.1);
+					assertEquals(5, histogramDump.stddev, 0.1);
+					assertEquals(6, histogramDump.max);
+					assertEquals(7, histogramDump.min);
+
+					assertTrue(histogramDump.scopeInfo instanceof QueryScopeInfo.OperatorQueryScopeInfo);
+					QueryScopeInfo.OperatorQueryScopeInfo opInfo = (QueryScopeInfo.OperatorQueryScopeInfo) histogramDump.scopeInfo;
+					assertEquals("E", opInfo.scope);
+					assertEquals("jid", opInfo.jobID);
+					assertEquals("vid", opInfo.vertexID);
+					assertEquals(2, opInfo.subtaskIndex);
+					assertEquals("opname", opInfo.operatorName);
+					histograms.remove(h1);
+					break;
+				case METRIC_CATEGORY_METER:
+					MetricDump.MeterDump meterDump = (MetricDump.MeterDump) metric;
+					assertEquals(5.0, meterDump.rate, 0.1);
+
+					assertTrue(meterDump.scopeInfo instanceof QueryScopeInfo.JobQueryScopeInfo);
+					assertEquals("C", meterDump.scopeInfo.scope);
+					assertEquals("c3", meterDump.name);
+					assertEquals("jid", ((QueryScopeInfo.JobQueryScopeInfo) meterDump.scopeInfo).jobID);
+					break;
+				default:
+					fail();
+			}
+		}
+		assertTrue(counters.isEmpty());
+		assertTrue(gauges.isEmpty());
+		assertTrue(histograms.isEmpty());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
new file mode 100644
index 0000000..3b65184
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.junit.Test;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static org.junit.Assert.assertEquals;
+
+public class MetricDumpTest {
+	@Test
+	public void testDumpedCounter() {
+		QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
+
+		MetricDump.CounterDump cd = new MetricDump.CounterDump(info, "counter", 4);
+
+		assertEquals("counter", cd.name);
+		assertEquals(4, cd.count);
+		assertEquals(info, cd.scopeInfo);
+		assertEquals(METRIC_CATEGORY_COUNTER, cd.getCategory());
+	}
+
+	@Test
+	public void testDumpedGauge() {
+		QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
+
+		MetricDump.GaugeDump gd = new MetricDump.GaugeDump(info, "gauge", "hello");
+
+		assertEquals("gauge", gd.name);
+		assertEquals("hello", gd.value);
+		assertEquals(info, gd.scopeInfo);
+		assertEquals(METRIC_CATEGORY_GAUGE, gd.getCategory());
+	}
+
+	@Test
+	public void testDumpedHistogram() {
+		QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
+
+		MetricDump.HistogramDump hd = new MetricDump.HistogramDump(info, "hist", 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+
+		assertEquals("hist", hd.name);
+		assertEquals(1, hd.min);
+		assertEquals(2, hd.max);
+		assertEquals(3, hd.mean, 0.1);
+		assertEquals(4, hd.median, 0.1);
+		assertEquals(5, hd.stddev, 0.1);
+		assertEquals(6, hd.p75, 0.1);
+		assertEquals(7, hd.p90, 0.1);
+		assertEquals(8, hd.p95, 0.1);
+		assertEquals(9, hd.p98, 0.1);
+		assertEquals(10, hd.p99, 0.1);
+		assertEquals(11, hd.p999, 0.1);
+		assertEquals(info, hd.scopeInfo);
+		assertEquals(METRIC_CATEGORY_HISTOGRAM, hd.getCategory());
+	}
+
+	@Test
+	public void testDumpedMeter() {
+		QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
+
+		MetricDump.MeterDump md = new MetricDump.MeterDump(info, "meter", 5.0);
+
+		assertEquals("meter", md.name);
+		assertEquals(5.0, md.rate, 0.1);
+		assertEquals(info, md.scopeInfo);
+		assertEquals(METRIC_CATEGORY_METER, md.getCategory());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
new file mode 100644
index 0000000..91563ec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MetricQueryServiceTest extends TestLogger {
+	@Test
+	public void testCreateDump() throws Exception {
+
+		ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
+		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s);
+		TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
+		TestActor testActor = (TestActor) testActorRef.underlyingActor();
+
+		final Counter c = new SimpleCounter();
+		final Gauge<String> g = new Gauge<String>() {
+			@Override
+			public String getValue() {
+				return "Hello";
+			}
+		};
+		final Histogram h = new TestingHistogram();
+		final Meter m = new Meter() {
+
+			@Override
+			public void markEvent() {
+			}
+
+			@Override
+			public void markEvent(long n) {
+			}
+
+			@Override
+			public double getRate() {
+				return 5;
+			}
+
+			@Override
+			public long getCount() {
+				return 10;
+			}
+		};
+
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+
+		MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);
+		MetricQueryService.notifyOfAddedMetric(serviceActor, g, "gauge", tm);
+		MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
+		MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
+
+		// these metrics will be removed *after* the first query
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
+
+		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+		synchronized (testActor.lock) {
+			if (testActor.message == null) {
+				testActor.lock.wait();
+			}
+		}
+
+		byte[] dump = (byte[]) testActor.message;
+		testActor.message = null;
+		assertTrue(dump.length > 0);
+
+		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+		synchronized (testActor.lock) {
+			if (testActor.message == null) {
+				testActor.lock.wait();
+			}
+		}
+
+		byte[] emptyDump = (byte[]) testActor.message;
+		testActor.message = null;
+		assertEquals(16, emptyDump.length);
+		for (int x = 0; x < 16; x++) {
+			assertEquals(0, emptyDump[x]);
+		}
+
+		s.shutdown();
+	}
+
+	private static class TestActor extends UntypedActor {
+		public Object message;
+		public Object lock = new Object();
+
+		@Override
+		public void onReceive(Object message) throws Exception {
+			synchronized (lock) {
+				this.message = message;
+				lock.notifyAll();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
new file mode 100644
index 0000000..597e376
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.junit.Test;
+
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+import static org.junit.Assert.assertEquals;
+
+public class QueryScopeInfoTest {
+	@Test
+	public void testJobManagerMetricInfo() {
+		QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
+		assertEquals("abc", info.scope);
+		assertEquals(INFO_CATEGORY_JM, info.getCategory());
+	}
+
+	@Test
+	public void testTaskManagerMetricInfo() {
+		QueryScopeInfo.TaskManagerQueryScopeInfo info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
+		assertEquals("abc", info.scope);
+		assertEquals("tmid", info.taskManagerID);
+		assertEquals(INFO_CATEGORY_TM, info.getCategory());
+	}
+
+	@Test
+	public void testJobMetricInfo() {
+		QueryScopeInfo.JobQueryScopeInfo info = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
+		assertEquals("abc", info.scope);
+		assertEquals("jobid", info.jobID);
+		assertEquals(INFO_CATEGORY_JOB, info.getCategory());
+	}
+
+	@Test
+	public void testTaskMetricInfo() {
+		QueryScopeInfo.TaskQueryScopeInfo info = new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "abc");
+		assertEquals("abc", info.scope);
+		assertEquals("jid", info.jobID);
+		assertEquals("vid", info.vertexID);
+		assertEquals(2, info.subtaskIndex);
+		assertEquals(INFO_CATEGORY_TASK, info.getCategory());
+	}
+
+	@Test
+	public void testOperatorMetricInfo() {
+		QueryScopeInfo.OperatorQueryScopeInfo info = new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "abc");
+		assertEquals("abc", info.scope);
+		assertEquals("jid", info.jobID);
+		assertEquals("vid", info.vertexID);
+		assertEquals("opname", info.operatorName);
+		assertEquals(2, info.subtaskIndex);
+		assertEquals(INFO_CATEGORY_OPERATOR, info.getCategory());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index d9b1ebe..78aac64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -18,7 +18,9 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;
@@ -33,6 +35,10 @@ public class AbstractMetricGroupTest {
 		MetricRegistry registry = new MetricRegistry(new Configuration());
 
 		AbstractMetricGroup group = new AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) {
+			@Override
+			protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+				return null;
+			}
 		};
 		assertTrue(group.getAllVariables().isEmpty());
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index faf42ea..1b9b24f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -22,13 +22,16 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class JobManagerGroupTest {
+public class JobManagerGroupTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
 	//  adding and removing jobs
@@ -116,4 +119,13 @@ public class JobManagerGroupTest {
 
 		registry.shutdown();
 	}
+
+	@Test
+	public void testCreateQueryServiceMetricInfo() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
+
+		QueryScopeInfo.JobManagerQueryScopeInfo info = jm.createQueryServiceMetricInfo(new DummyCharacterFilter());
+		assertEquals("", info.scope);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index 45f37ac..c3443f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -22,12 +22,15 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
-public class JobManagerJobGroupTest {
+public class JobManagerJobGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
@@ -92,4 +95,16 @@ public class JobManagerJobGroupTest {
 
 		registry.shutdown();
 	}
+
+	@Test
+	public void testCreateQueryServiceMetricInfo() {
+		JobID jid = new JobID();
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
+		JobManagerJobMetricGroup jmj = new JobManagerJobMetricGroup(registry, jm, jid, "jobname");
+
+		QueryScopeInfo.JobQueryScopeInfo info = jmj.createQueryServiceMetricInfo(new DummyCharacterFilter());
+		assertEquals("", info.scope);
+		assertEquals(jid.toString(), info.jobID);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index a27206d..3fe8d75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -18,19 +18,25 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
-public class MetricGroupTest {
+public class MetricGroupTest extends TestLogger {
 	
 	private MetricRegistry registry;
 
@@ -110,6 +116,24 @@ public class MetricGroupTest {
 		assertNotNull(group.addGroup(name));
 		assertNotNull(group.counter(name));
 	}
+
+	@Test
+	public void testCreateQueryServiceMetricInfo() {
+		JobID jid = new JobID();
+		AbstractID vid = new AbstractID();
+		AbstractID eid = new AbstractID();
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+		TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
+		GenericMetricGroup userGroup = new GenericMetricGroup(registry, task, "hello");
+
+		QueryScopeInfo.TaskQueryScopeInfo info = (QueryScopeInfo.TaskQueryScopeInfo) userGroup.createQueryServiceMetricInfo(new DummyCharacterFilter());
+		assertEquals("hello", info.scope);
+		assertEquals(jid.toString(), info.jobID);
+		assertEquals(vid.toString(), info.vertexID);
+		assertEquals(4, info.subtaskIndex);
+	}
 	
 	// ------------------------------------------------------------------------
 	
@@ -139,6 +163,11 @@ public class MetricGroupTest {
 		}
 
 		@Override
+		protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+			return null;
+		}
+
+		@Override
 		protected void addMetric(String name, Metric metric) {}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index c193ac8..2357936 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -21,9 +21,12 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.util.Map;
@@ -31,9 +34,8 @@ import java.util.Map;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 
-public class OperatorGroupTest {
+public class OperatorGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
@@ -91,4 +93,23 @@ public class OperatorGroupTest {
 		assertNotNull(actualValue);
 		assertEquals(expectedValue, actualValue);
 	}
+
+	@Test
+	public void testCreateQueryServiceMetricInfo() {
+		JobID jid = new JobID();
+		AbstractID vid = new AbstractID();
+		AbstractID eid = new AbstractID();
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+		TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
+		OperatorMetricGroup operator = new OperatorMetricGroup(registry, task, "operator");
+
+		QueryScopeInfo.OperatorQueryScopeInfo info = operator.createQueryServiceMetricInfo(new DummyCharacterFilter());
+		assertEquals("", info.scope);
+		assertEquals(jid.toString(), info.jobID);
+		assertEquals(vid.toString(), info.vertexID);
+		assertEquals(4, info.subtaskIndex);
+		assertEquals("operator", info.operatorName);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index b2c5dc7..a68e59d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -29,9 +29,12 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -40,7 +43,7 @@ import java.util.ArrayList;
 
 import static org.junit.Assert.*;
 
-public class TaskManagerGroupTest {
+public class TaskManagerGroupTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
 	//  adding and removing jobs
@@ -269,4 +272,14 @@ public class TaskManagerGroupTest {
 		assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name"));
 		registry.shutdown();
 	}
+
+	@Test
+	public void testCreateQueryServiceMetricInfo() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+
+		QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter());
+		assertEquals("", info.scope);
+		assertEquals("id", info.taskManagerID);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index c96af45..175ded1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -23,12 +23,15 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
-public class TaskManagerJobGroupTest {
+public class TaskManagerJobGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
@@ -90,4 +93,16 @@ public class TaskManagerJobGroupTest {
 				jmGroup.getMetricIdentifier("name"));
 		registry.shutdown();
 	}
+
+	@Test
+	public void testCreateQueryServiceMetricInfo() {
+		JobID jid = new JobID();
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+
+		QueryScopeInfo.JobQueryScopeInfo info = job.createQueryServiceMetricInfo(new DummyCharacterFilter());
+		assertEquals("", info.scope);
+		assertEquals(jid.toString(), info.jobID);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index da07f8f..c65c1da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -24,15 +24,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
-
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class TaskMetricGroupTest {
+public class TaskMetricGroupTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
 	//  scope tests
@@ -110,6 +112,23 @@ public class TaskMetricGroupTest {
 	}
 
 	@Test
+	public void testCreateQueryServiceMetricInfo() {
+		JobID jid = new JobID();
+		AbstractID vid = new AbstractID();
+		AbstractID eid = new AbstractID();
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+		TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
+
+		QueryScopeInfo.TaskQueryScopeInfo info = task.createQueryServiceMetricInfo(new DummyCharacterFilter());
+		assertEquals("", info.scope);
+		assertEquals(jid.toString(), info.jobID);
+		assertEquals(vid.toString(), info.vertexID);
+		assertEquals(4, info.subtaskIndex);
+	}
+
+	@Test
 	public void testTaskMetricGroupCleanup() {
 		CountingMetricRegistry registry = new CountingMetricRegistry(new Configuration());
 		TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0");

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
new file mode 100644
index 0000000..601f734
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.CharacterFilter;
+
+public class DummyCharacterFilter implements CharacterFilter {
+	@Override
+	public String filterCharacters(String input) {
+		return input;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
new file mode 100644
index 0000000..82f8504
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+public class TestingHistogram implements Histogram {
+
+	@Override
+	public void update(long value) {
+	}
+
+	@Override
+	public long getCount() {
+		return 1;
+	}
+
+	@Override
+	public HistogramStatistics getStatistics() {
+		return new HistogramStatistics() {
+			@Override
+			public double getQuantile(double quantile) {
+				return quantile;
+			}
+
+			@Override
+			public long[] getValues() {
+				return new long[0];
+			}
+
+			@Override
+			public int size() {
+				return 3;
+			}
+
+			@Override
+			public double getMean() {
+				return 4;
+			}
+
+			@Override
+			public double getStdDev() {
+				return 5;
+			}
+
+			@Override
+			public long getMax() {
+				return 6;
+			}
+
+			@Override
+			public long getMin() {
+				return 7;
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index bda4174..bc83db9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
@@ -138,7 +139,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 				ioManager,
 				network,
 				numberOfSlots,
-				leaderRetrievalService);
+				leaderRetrievalService,
+				new MetricRegistry(config));
 
 			final ActorRef taskManager = actorSystem.actorOf(tmProps);