You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/09/15 17:18:39 UTC
[2/4] flink git commit: [FLINK-4389] Expose metrics to WebFrontend
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
new file mode 100644
index 0000000..143faaf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+
+/**
+ * Utility class for the serialization of metrics.
+ */
+public class MetricDumpSerialization {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricDumpSerialization.class);
+
+ private MetricDumpSerialization() {
+ }
+
+ //-------------------------------------------------------------------------
+ // Serialization
+ //-------------------------------------------------------------------------
+ public static class MetricDumpSerializer {
+ private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ private DataOutputStream dos = new DataOutputStream(baos);
+
+ /**
+ * Serializes the given metrics and returns the resulting byte array.
+ *
+ * @param counters counters to serialize
+ * @param gauges gauges to serialize
+ * @param histograms histograms to serialize
+ * @return byte array containing the serialized metrics
+ * @throws IOException
+ */
+ public byte[] serialize(
+ Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
+ Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
+ Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
+ Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
+
+ baos.reset();
+ dos.writeInt(counters.size());
+ dos.writeInt(gauges.size());
+ dos.writeInt(histograms.size());
+ dos.writeInt(meters.size());
+
+ for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
+ serializeMetricInfo(dos, entry.getValue().f0);
+ serializeString(dos, entry.getValue().f1);
+ serializeCounter(dos, entry.getKey());
+ }
+
+ for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
+ serializeMetricInfo(dos, entry.getValue().f0);
+ serializeString(dos, entry.getValue().f1);
+ serializeGauge(dos, entry.getKey());
+ }
+
+ for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
+ serializeMetricInfo(dos, entry.getValue().f0);
+ serializeString(dos, entry.getValue().f1);
+ serializeHistogram(dos, entry.getKey());
+ }
+
+ for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
+ serializeMetricInfo(dos, entry.getValue().f0);
+ serializeString(dos, entry.getValue().f1);
+ serializeMeter(dos, entry.getKey());
+ }
+ return baos.toByteArray();
+ }
+
+ public void close() {
+ try {
+ dos.close();
+ } catch (Exception e) {
+ LOG.debug("Failed to close OutputStream.", e);
+ }
+ try {
+ baos.close();
+ } catch (Exception e) {
+ LOG.debug("Failed to close OutputStream.", e);
+ }
+ }
+ }
+
+ private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException {
+ serializeString(dos, info.scope);
+ dos.writeByte(info.getCategory());
+ switch (info.getCategory()) {
+ case INFO_CATEGORY_JM:
+ break;
+ case INFO_CATEGORY_TM:
+ String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+ serializeString(dos, tmID);
+ break;
+ case INFO_CATEGORY_JOB:
+ QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+ serializeString(dos, jobInfo.jobID);
+ break;
+ case INFO_CATEGORY_TASK:
+ QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
+ serializeString(dos, taskInfo.jobID);
+ serializeString(dos, taskInfo.vertexID);
+ dos.writeInt(taskInfo.subtaskIndex);
+ break;
+ case INFO_CATEGORY_OPERATOR:
+ QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
+ serializeString(dos, operatorInfo.jobID);
+ serializeString(dos, operatorInfo.vertexID);
+ dos.writeInt(operatorInfo.subtaskIndex);
+ serializeString(dos, operatorInfo.operatorName);
+ break;
+ }
+ }
+
+ private static void serializeString(DataOutputStream dos, String string) throws IOException {
+ byte[] bytes = string.getBytes();
+ dos.writeInt(bytes.length);
+ dos.write(bytes);
+ }
+
+ private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException {
+ dos.writeLong(counter.getCount());
+ }
+
+ private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException {
+ serializeString(dos, gauge.getValue().toString());
+ }
+
+ private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException {
+ HistogramStatistics stat = histogram.getStatistics();
+
+ dos.writeLong(stat.getMin());
+ dos.writeLong(stat.getMax());
+ dos.writeDouble(stat.getMean());
+ dos.writeDouble(stat.getQuantile(0.5));
+ dos.writeDouble(stat.getStdDev());
+ dos.writeDouble(stat.getQuantile(0.75));
+ dos.writeDouble(stat.getQuantile(0.90));
+ dos.writeDouble(stat.getQuantile(0.95));
+ dos.writeDouble(stat.getQuantile(0.98));
+ dos.writeDouble(stat.getQuantile(0.99));
+ dos.writeDouble(stat.getQuantile(0.999));
+ }
+
+ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOException {
+ dos.writeDouble(meter.getRate());
+ }
+
+ //-------------------------------------------------------------------------
+ // Deserialization
+ //-------------------------------------------------------------------------
+ public static class MetricDumpDeserializer {
+ /**
+ * De-serializes metrics from the given byte array and returns them as a list of {@link MetricDump}.
+ *
+ * @param data serialized metrics
+ * @return A list containing the deserialized metrics.
+ * @throws IOException
+ */
+ public List<MetricDump> deserialize(byte[] data) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputStream dis = new DataInputStream(bais);
+
+ int numCounters = dis.readInt();
+ int numGauges = dis.readInt();
+ int numHistograms = dis.readInt();
+ int numMeters = dis.readInt();
+
+ List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
+
+ for (int x = 0; x < numCounters; x++) {
+ metrics.add(deserializeCounter(dis));
+ }
+
+ for (int x = 0; x < numGauges; x++) {
+ metrics.add(deserializeGauge(dis));
+ }
+
+ for (int x = 0; x < numHistograms; x++) {
+ metrics.add(deserializeHistogram(dis));
+ }
+
+ for (int x = 0; x < numMeters; x++) {
+ metrics.add(deserializeMeter(dis));
+ }
+
+ return metrics;
+ }
+ }
+
+ private static String deserializeString(DataInputStream dis) throws IOException {
+ int stringLength = dis.readInt();
+ byte[] bytes = new byte[stringLength];
+ dis.readFully(bytes);
+ return new String(bytes);
+ }
+
+ private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException {
+ QueryScopeInfo scope = deserializeMetricInfo(dis);
+ String name = deserializeString(dis);
+ return new MetricDump.CounterDump(scope, name, dis.readLong());
+ }
+
+ private static MetricDump.GaugeDump deserializeGauge(DataInputStream dis) throws IOException {
+ QueryScopeInfo scope = deserializeMetricInfo(dis);
+ String name = deserializeString(dis);
+ String value = deserializeString(dis);
+ return new MetricDump.GaugeDump(scope, name, value);
+ }
+
+ private static MetricDump.HistogramDump deserializeHistogram(DataInputStream dis) throws IOException {
+ QueryScopeInfo info = deserializeMetricInfo(dis);
+ String name = deserializeString(dis);
+ long min = dis.readLong();
+ long max = dis.readLong();
+ double mean = dis.readDouble();
+ double median = dis.readDouble();
+ double stddev = dis.readDouble();
+ double p75 = dis.readDouble();
+ double p90 = dis.readDouble();
+ double p95 = dis.readDouble();
+ double p98 = dis.readDouble();
+ double p99 = dis.readDouble();
+ double p999 = dis.readDouble();
+ return new MetricDump.HistogramDump(info, name, min, max, mean, median, stddev, p75, p90, p95, p98, p99, p999);
+ }
+
+ private static MetricDump.MeterDump deserializeMeter(DataInputStream dis) throws IOException {
+ QueryScopeInfo info = deserializeMetricInfo(dis);
+ String name = deserializeString(dis);
+ double rate = dis.readDouble();
+ return new MetricDump.MeterDump(info, name, rate);
+ }
+
+ private static QueryScopeInfo deserializeMetricInfo(DataInputStream dis) throws IOException {
+ String jobID;
+ String vertexID;
+ int subtaskIndex;
+
+ String scope = deserializeString(dis);
+ byte cat = dis.readByte();
+ switch (cat) {
+ case INFO_CATEGORY_JM:
+ return new QueryScopeInfo.JobManagerQueryScopeInfo(scope);
+ case INFO_CATEGORY_TM:
+ String tmID = deserializeString(dis);
+ return new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID, scope);
+ case INFO_CATEGORY_JOB:
+ jobID = deserializeString(dis);
+ return new QueryScopeInfo.JobQueryScopeInfo(jobID, scope);
+ case INFO_CATEGORY_TASK:
+ jobID = deserializeString(dis);
+ vertexID = deserializeString(dis);
+ subtaskIndex = dis.readInt();
+ return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope);
+ case INFO_CATEGORY_OPERATOR:
+ jobID = deserializeString(dis);
+ vertexID = deserializeString(dis);
+ subtaskIndex = dis.readInt();
+ String operatorName = deserializeString(dis);
+ return new QueryScopeInfo.OperatorQueryScopeInfo(jobID, vertexID, subtaskIndex, operatorName, scope);
+ default:
+ throw new IOException("sup");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
new file mode 100644
index 0000000..6e0b443
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpSerializer;
+
+/**
+ * The MetricQueryService creates a key-value representation of all metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and can be notified of
+ * - an added metric by calling {@link MetricQueryService#notifyOfAddedMetric(ActorRef, Metric, String, AbstractMetricGroup)}
+ * - a removed metric by calling {@link MetricQueryService#notifyOfRemovedMetric(ActorRef, Metric)}
+ * - a metric dump request by sending the return value of {@link MetricQueryService#getCreateDump()}
+ */
+public class MetricQueryService extends UntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);
+
+ public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
+
+ private static final CharacterFilter FILTER = new CharacterFilter() {
+ @Override
+ public String filterCharacters(String input) {
+ return replaceInvalidChars(input);
+ }
+ };
+
+ private final MetricDumpSerializer serializer = new MetricDumpSerializer();
+
+ private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
+ private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
+ private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
+ private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
+
+ @Override
+ public void postStop() {
+ serializer.close();
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ try {
+ if (message instanceof AddMetric) {
+ AddMetric added = (AddMetric) message;
+
+ String metricName = added.metricName;
+ Metric metric = added.metric;
+ AbstractMetricGroup group = added.group;
+
+ QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER);
+
+ if (metric instanceof Counter) {
+ counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+ } else if (metric instanceof Gauge) {
+ gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+ } else if (metric instanceof Histogram) {
+ histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+ } else if (metric instanceof Meter) {
+ meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+ }
+ } else if (message instanceof RemoveMetric) {
+ Metric metric = (((RemoveMetric) message).metric);
+ if (metric instanceof Counter) {
+ this.counters.remove(metric);
+ } else if (metric instanceof Gauge) {
+ this.gauges.remove(metric);
+ } else if (metric instanceof Histogram) {
+ this.histograms.remove(metric);
+ } else if (metric instanceof Meter) {
+ this.meters.remove(metric);
+ }
+ } else if (message instanceof CreateDump) {
+ byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
+ getSender().tell(dump, getSelf());
+ } else {
+ LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
+ getSender().tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), getSelf());
+ }
+ } catch (Exception e) {
+ LOG.warn("An exception occurred while processing a message.", e);
+ }
+ }
+
+ /**
+ * Lightweight method to replace unsupported characters.
+ * If the string does not contain any unsupported characters, this method creates no
+ * new string (and in fact no new objects at all).
+ *
+ * <p>Replacements:
+ *
+ * <ul>
+ * <li>{@code space : . ,} are replaced by {@code _} (underscore)</li>
+ * </ul>
+ */
+ static String replaceInvalidChars(String str) {
+ char[] chars = null;
+ final int strLen = str.length();
+ int pos = 0;
+
+ for (int i = 0; i < strLen; i++) {
+ final char c = str.charAt(i);
+ switch (c) {
+ case ' ':
+ case '.':
+ case ':':
+ case ',':
+ if (chars == null) {
+ chars = str.toCharArray();
+ }
+ chars[pos++] = '_';
+ break;
+ default:
+ if (chars != null) {
+ chars[pos] = c;
+ }
+ pos++;
+ }
+ }
+
+ return chars == null ? str : new String(chars, 0, pos);
+ }
+
+ /**
+ * Starts the MetricQueryService actor in the given actor system.
+ *
+ * @param actorSystem The actor system running the MetricQueryService
+ * @return actor reference to the MetricQueryService
+ */
+ public static ActorRef startMetricQueryService(ActorSystem actorSystem) {
+ return actorSystem.actorOf(Props.create(MetricQueryService.class), METRIC_QUERY_SERVICE_NAME);
+ }
+
+ /**
+ * Utility method to notify a MetricQueryService of an added metric.
+ *
+ * @param service MetricQueryService to notify
+ * @param metric added metric
+ * @param metricName metric name
+ * @param group group the metric was added on
+ */
+ public static void notifyOfAddedMetric(ActorRef service, Metric metric, String metricName, AbstractMetricGroup group) {
+ service.tell(new AddMetric(metricName, metric, group), null);
+ }
+
+ /**
+ * Utility method to notify a MetricQueryService of a removed metric.
+ *
+ * @param service MetricQueryService to notify
+ * @param metric removed metric
+ */
+ public static void notifyOfRemovedMetric(ActorRef service, Metric metric) {
+ service.tell(new RemoveMetric(metric), null);
+ }
+
+ private static class AddMetric {
+ private final String metricName;
+ private final Metric metric;
+ private final AbstractMetricGroup group;
+
+ private AddMetric(String metricName, Metric metric, AbstractMetricGroup group) {
+ this.metricName = metricName;
+ this.metric = metric;
+ this.group = group;
+ }
+ }
+
+ private static class RemoveMetric {
+ private final Metric metric;
+
+ private RemoveMetric(Metric metric) {
+ this.metric = metric;
+ }
+ }
+
+ public static Object getCreateDump() {
+ return CreateDump.INSTANCE;
+ }
+
+ private static class CreateDump implements Serializable {
+ private static CreateDump INSTANCE = new CreateDump();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
new file mode 100644
index 0000000..df5c2bf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+/**
+ * Container for scope related information as required by the MetricQueryService.
+ */
+public abstract class QueryScopeInfo {
+ /** Categories to be returned by {@link QueryScopeInfo#getCategory()} to avoid instanceof checks. */
+ public static final byte INFO_CATEGORY_JM = 0;
+ public static final byte INFO_CATEGORY_TM = 1;
+ public static final byte INFO_CATEGORY_JOB = 2;
+ public static final byte INFO_CATEGORY_TASK = 3;
+ public static final byte INFO_CATEGORY_OPERATOR = 4;
+
+ /** The remaining scope not covered by specific fields */
+ public final String scope;
+
+ private QueryScopeInfo(String scope) {
+ this.scope = scope;
+ }
+
+ /**
+ * Create a copy of this QueryScopeInfo and append the given scope.
+ *
+ * @param userScope scope to append
+ * @return modified copy of this QueryScopeInfo
+ */
+ public abstract QueryScopeInfo copy(String userScope);
+
+ /**
+ * Returns the category for this QueryScopeInfo.
+ *
+ * @return category
+ */
+ public abstract byte getCategory();
+
+ /**
+ * Container for the job manager scope. Stores no additional information.
+ */
+ public static class JobManagerQueryScopeInfo extends QueryScopeInfo {
+ public JobManagerQueryScopeInfo() {
+ super("");
+ }
+
+ public JobManagerQueryScopeInfo(String scope) {
+ super(scope);
+ }
+
+ @Override
+ public JobManagerQueryScopeInfo copy(String additionalScope) {
+ return new JobManagerQueryScopeInfo(this.scope + additionalScope);
+ }
+
+ @Override
+ public byte getCategory() {
+ return INFO_CATEGORY_JM;
+ }
+ }
+
+ /**
+ * Container for the task manager scope. Stores the ID of the task manager.
+ */
+ public static class TaskManagerQueryScopeInfo extends QueryScopeInfo {
+ public final String taskManagerID;
+
+ public TaskManagerQueryScopeInfo(String taskManagerId) {
+ this(taskManagerId, "");
+ }
+
+ public TaskManagerQueryScopeInfo(String taskManagerId, String scope) {
+ super(scope);
+ this.taskManagerID = taskManagerId;
+ }
+
+ @Override
+ public TaskManagerQueryScopeInfo copy(String additionalScope) {
+ return new TaskManagerQueryScopeInfo(this.taskManagerID, this.scope + additionalScope);
+ }
+
+ @Override
+ public byte getCategory() {
+ return INFO_CATEGORY_TM;
+ }
+ }
+
+ /**
+ * Container for the job scope. Stores the ID of the job.
+ */
+ public static class JobQueryScopeInfo extends QueryScopeInfo {
+ public final String jobID;
+
+ public JobQueryScopeInfo(String jobID) {
+ this(jobID, "");
+ }
+
+ public JobQueryScopeInfo(String jobID, String scope) {
+ super(scope);
+ this.jobID = jobID;
+ }
+
+ @Override
+ public JobQueryScopeInfo copy(String additionalScope) {
+ return new JobQueryScopeInfo(this.jobID, this.scope + additionalScope);
+ }
+
+ @Override
+ public byte getCategory() {
+ return INFO_CATEGORY_JOB;
+ }
+ }
+
+ /**
+ * Container for the task scope. Stores the ID of the job/vertex and subtask index.
+ */
+ public static class TaskQueryScopeInfo extends QueryScopeInfo {
+ public final String jobID;
+ public final String vertexID;
+ public final int subtaskIndex;
+
+ public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex) {
+ this(jobID, vertexid, subtaskIndex, "");
+ }
+
+ public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String scope) {
+ super(scope);
+ this.jobID = jobID;
+ this.vertexID = vertexid;
+ this.subtaskIndex = subtaskIndex;
+ }
+
+ @Override
+ public TaskQueryScopeInfo copy(String additionalScope) {
+ return new TaskQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.scope + additionalScope);
+ }
+
+ @Override
+ public byte getCategory() {
+ return INFO_CATEGORY_TASK;
+ }
+ }
+
+ /**
+ * Container for the operator scope. Stores the ID of the job/vertex, the subtask index and the name of the operator.
+ */
+ public static class OperatorQueryScopeInfo extends QueryScopeInfo {
+ public final String jobID;
+ public final String vertexID;
+ public final int subtaskIndex;
+ public final String operatorName;
+
+ public OperatorQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String operatorName) {
+ this(jobID, vertexid, subtaskIndex, operatorName, "");
+ }
+
+ public OperatorQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String operatorName, String scope) {
+ super(scope);
+ this.jobID = jobID;
+ this.vertexID = vertexid;
+ this.subtaskIndex = subtaskIndex;
+ this.operatorName = operatorName;
+ }
+
+ @Override
+ public OperatorQueryScopeInfo copy(String additionalScope) {
+ return new OperatorQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.operatorName, this.scope + additionalScope);
+ }
+
+ @Override
+ public byte getCategory() {
+ return INFO_CATEGORY_OPERATOR;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 89fe3cd..75476a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -27,6 +27,7 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +87,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
* For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
private String scopeString;
+ /** The metrics query service scope represented by this group, lazily computed. */
+ protected QueryScopeInfo queryServiceScopeInfo;
+
/** Flag indicating whether this group has been closed */
private volatile boolean closed;
@@ -123,6 +127,27 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
}
/**
+ * Returns the metric query service scope for this group.
+ *
+ * @param filter character filter
+ * @return query service scope
+ */
+ public QueryScopeInfo getQueryServiceMetricInfo(CharacterFilter filter) {
+ if (queryServiceScopeInfo == null) {
+ queryServiceScopeInfo = createQueryServiceMetricInfo(filter);
+ }
+ return queryServiceScopeInfo;
+ }
+
+ /**
+ * Creates the metric query service scope for this group.
+ *
+ * @param filter character filter
+ * @return query service scope
+ */
+ protected abstract QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter);
+
+ /**
* Returns the fully qualified metric name, for example
* {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
*
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
index 569ad0f..ab6418d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
@@ -18,16 +18,26 @@
package org.apache.flink.runtime.metrics.groups;
+import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
/**
* A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to hold
* subgroups of metrics.
*/
public class GenericMetricGroup extends AbstractMetricGroup<AbstractMetricGroup<?>> {
+ /** The name of this group */
+ private String name;
public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {
super(registry, makeScopeComponents(parent, name), parent);
+ this.name = name;
+ }
+
+ @Override
+ protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+ return parent.getQueryServiceMetricInfo(filter).copy(filter.filterCharacters(this.name));
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index da0d8f8..2f6b07a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import java.util.HashMap;
@@ -46,6 +48,11 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric
return hostname;
}
+ @Override
+ protected QueryScopeInfo.JobManagerQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+ return new QueryScopeInfo.JobManagerQueryScopeInfo();
+ }
+
// ------------------------------------------------------------------------
// job groups
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
index e101d2f..091807f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import javax.annotation.Nullable;
@@ -65,6 +67,11 @@ public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends
return jobName;
}
+ @Override
+ protected QueryScopeInfo.JobQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+ return new QueryScopeInfo.JobQueryScopeInfo(this.jobId.toString());
+ }
+
// ------------------------------------------------------------------------
// Component Metric Group Specifics
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
index 9352faf..1ed55d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.metrics.groups;
+import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import java.util.Collections;
@@ -42,6 +44,15 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
public final TaskMetricGroup parent() {
return parent;
}
+
+ @Override
+ protected QueryScopeInfo.OperatorQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+ return new QueryScopeInfo.OperatorQueryScopeInfo(
+ this.parent.parent.jobId.toString(),
+ this.parent.vertexId.toString(),
+ this.parent.subtaskIndex,
+ filter.filterCharacters(this.operatorName));
+ }
// ------------------------------------------------------------------------
// Component Metric Group Specifics
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index 8f81cfd..5bdd014 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -19,8 +19,10 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import java.util.HashMap;
@@ -55,6 +57,11 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr
return taskManagerId;
}
+ @Override
+ protected QueryScopeInfo.TaskManagerQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+ return new QueryScopeInfo.TaskManagerQueryScopeInfo(this.taskManagerId);
+ }
+
// ------------------------------------------------------------------------
// job groups
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 78fec97..0e76ab0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.metrics.groups;
+import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.util.AbstractID;
@@ -43,12 +45,12 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
private final AbstractID executionId;
@Nullable
- private final AbstractID vertexId;
+ protected final AbstractID vertexId;
@Nullable
private final String taskName;
- private final int subtaskIndex;
+ protected final int subtaskIndex;
private final int attemptNumber;
@@ -113,6 +115,14 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
return ioMetrics;
}
+ @Override
+ protected QueryScopeInfo.TaskQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+ return new QueryScopeInfo.TaskQueryScopeInfo(
+ this.parent.jobId.toString(),
+ this.vertexId.toString(),
+ this.subtaskIndex);
+ }
+
// ------------------------------------------------------------------------
// operators and cleanup
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f67be0e..1c68874 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2750,6 +2750,12 @@ object JobManager {
case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
case None => actorSystem.actorOf(jobManagerProps)
}
+
+ metricsRegistry match {
+ case Some(registry) =>
+ registry.startQueryService(actorSystem)
+ case None =>
+ }
(jobManager, archive)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index cac5d91..27c9dd9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -203,7 +203,8 @@ class LocalFlinkMiniCluster(
memoryManager,
ioManager,
network,
- leaderRetrievalService) = TaskManager.createTaskManagerComponents(
+ leaderRetrievalService,
+ metricsRegistry) = TaskManager.createTaskManagerComponents(
config,
resourceID,
hostname, // network interface to bind to
@@ -218,7 +219,10 @@ class LocalFlinkMiniCluster(
memoryManager,
ioManager,
network,
- leaderRetrievalService)
+ leaderRetrievalService,
+ metricsRegistry)
+
+ metricsRegistry.startQueryService(system)
system.actorOf(props, taskManagerActorName)
}
@@ -274,7 +278,8 @@ class LocalFlinkMiniCluster(
memoryManager: MemoryManager,
ioManager: IOManager,
networkEnvironment: NetworkEnvironment,
- leaderRetrievalService: LeaderRetrievalService): Props = {
+ leaderRetrievalService: LeaderRetrievalService,
+ metricsRegistry: MetricRegistry): Props = {
TaskManager.getTaskManagerProps(
taskManagerClass,
@@ -284,7 +289,8 @@ class LocalFlinkMiniCluster(
memoryManager,
ioManager,
networkEnvironment,
- leaderRetrievalService)
+ leaderRetrievalService,
+ metricsRegistry)
}
def getResourceManagerProps(
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8ebdd80..c882631 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -132,7 +132,8 @@ class TaskManager(
protected val ioManager: IOManager,
protected val network: NetworkEnvironment,
protected val numberOfSlots: Int,
- protected val leaderRetrievalService: LeaderRetrievalService)
+ protected val leaderRetrievalService: LeaderRetrievalService,
+ protected val metricsRegistry: FlinkMetricRegistry)
extends FlinkActor
with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging
with LogMessages // Mixin order is important: first we want to support message logging
@@ -158,7 +159,6 @@ class TaskManager(
/** Registry of metrics periodically transmitted to the JobManager */
private val metricRegistry = TaskManager.createMetricsRegistry()
- private var metricsRegistry : FlinkMetricRegistry = _
private var taskManagerMetricGroup : TaskManagerMetricGroup = _
/** Metric serialization */
@@ -276,11 +276,7 @@ class TaskManager(
// failsafe shutdown of the metrics registry
try {
- val reg = metricsRegistry
- metricsRegistry = null
- if (reg != null) {
- reg.shutdown()
- }
+ metricsRegistry.shutdown()
} catch {
case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
}
@@ -985,8 +981,6 @@ class TaskManager(
else {
libraryCacheManager = Some(new FallbackLibraryCacheManager)
}
-
- metricsRegistry = new FlinkMetricRegistry(config.configuration)
taskManagerMetricGroup =
new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
@@ -1064,9 +1058,12 @@ class TaskManager(
network.getKvStateRegistry.unregisterListener()
}
- // stop the metrics reporters
- metricsRegistry.shutdown()
- metricsRegistry = null
+ // failsafe shutdown of the metrics registry
+ try {
+ metricsRegistry.shutdown()
+ } catch {
+ case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
+ }
}
protected def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): Unit = {
@@ -1849,7 +1846,8 @@ object TaskManager {
memoryManager,
ioManager,
network,
- leaderRetrievalService) = createTaskManagerComponents(
+ leaderRetrievalService,
+ metricsRegistry) = createTaskManagerComponents(
configuration,
resourceID,
taskManagerHostname,
@@ -1865,7 +1863,10 @@ object TaskManager {
memoryManager,
ioManager,
network,
- leaderRetrievalService)
+ leaderRetrievalService,
+ metricsRegistry)
+
+ metricsRegistry.startQueryService(actorSystem)
taskManagerActorName match {
case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -1881,7 +1882,8 @@ object TaskManager {
memoryManager: MemoryManager,
ioManager: IOManager,
networkEnvironment: NetworkEnvironment,
- leaderRetrievalService: LeaderRetrievalService
+ leaderRetrievalService: LeaderRetrievalService,
+ metricsRegistry: FlinkMetricRegistry
): Props = {
Props(
taskManagerClass,
@@ -1892,7 +1894,8 @@ object TaskManager {
ioManager,
networkEnvironment,
taskManagerConfig.numberOfSlots,
- leaderRetrievalService)
+ leaderRetrievalService,
+ metricsRegistry)
}
def createTaskManagerComponents(
@@ -1906,7 +1909,8 @@ object TaskManager {
MemoryManager,
IOManager,
NetworkEnvironment,
- LeaderRetrievalService) = {
+ LeaderRetrievalService,
+ FlinkMetricRegistry) = {
val (taskManagerConfig : TaskManagerConfiguration,
netConfig: NetworkEnvironmentConfiguration,
@@ -2081,12 +2085,15 @@ object TaskManager {
case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
}
+ val metricsRegistry = new FlinkMetricRegistry(configuration)
+
(taskManagerConfig,
taskManagerLocation,
memoryManager,
ioManager,
network,
- leaderRetrievalService)
+ leaderRetrievalService,
+ metricsRegistry)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
new file mode 100644
index 0000000..bc0f005
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class MetricDumpSerializerTest {
+ @Test
+ public void testSerialization() throws IOException {
+ MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+ MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
+
+ Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
+ Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
+ Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
+ Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
+
+ SimpleCounter c1 = new SimpleCounter();
+ SimpleCounter c2 = new SimpleCounter();
+ SimpleCounter c3 = new SimpleCounter();
+
+ c1.inc(1);
+ c2.inc(2);
+
+ Gauge<Integer> g1 = new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return 4;
+ }
+ };
+
+ Histogram h1 = new TestingHistogram();
+
+ Meter m1 = new Meter() {
+ @Override
+ public void markEvent() {
+ }
+
+ @Override
+ public void markEvent(long n) {
+ }
+
+ @Override
+ public double getRate() {
+ return 5;
+ }
+
+ @Override
+ public long getCount() {
+ return 10;
+ }
+ };
+
+ counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "c1"));
+ counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "B"), "c2"));
+ meters.put(m1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo("jid", "C"), "c3"));
+ gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
+ histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
+
+ byte[] serialized = serializer.serialize(counters, gauges, histograms, meters);
+ List<MetricDump> deserialized = deserializer.deserialize(serialized);
+
+ // ===== Counters ==============================================================================================
+ assertEquals(5, deserialized.size());
+
+ for (MetricDump metric : deserialized) {
+ switch (metric.getCategory()) {
+ case METRIC_CATEGORY_COUNTER:
+ MetricDump.CounterDump counterDump = (MetricDump.CounterDump) metric;
+ switch ((byte) counterDump.count) {
+ case 1:
+ assertTrue(counterDump.scopeInfo instanceof QueryScopeInfo.JobManagerQueryScopeInfo);
+ assertEquals("A", counterDump.scopeInfo.scope);
+ assertEquals("c1", counterDump.name);
+ counters.remove(c1);
+ break;
+ case 2:
+ assertTrue(counterDump.scopeInfo instanceof QueryScopeInfo.TaskManagerQueryScopeInfo);
+ assertEquals("B", counterDump.scopeInfo.scope);
+ assertEquals("c2", counterDump.name);
+ assertEquals("tmid", ((QueryScopeInfo.TaskManagerQueryScopeInfo) counterDump.scopeInfo).taskManagerID);
+ counters.remove(c2);
+ break;
+ default:
+ fail();
+ }
+ break;
+ case METRIC_CATEGORY_GAUGE:
+ MetricDump.GaugeDump gaugeDump = (MetricDump.GaugeDump) metric;
+ assertEquals("4", gaugeDump.value);
+ assertEquals("g1", gaugeDump.name);
+
+ assertTrue(gaugeDump.scopeInfo instanceof QueryScopeInfo.TaskQueryScopeInfo);
+ QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) gaugeDump.scopeInfo;
+ assertEquals("D", taskInfo.scope);
+ assertEquals("jid", taskInfo.jobID);
+ assertEquals("vid", taskInfo.vertexID);
+ assertEquals(2, taskInfo.subtaskIndex);
+ gauges.remove(g1);
+ break;
+ case METRIC_CATEGORY_HISTOGRAM:
+ MetricDump.HistogramDump histogramDump = (MetricDump.HistogramDump) metric;
+ assertEquals("h1", histogramDump.name);
+ assertEquals(0.5, histogramDump.median, 0.1);
+ assertEquals(0.75, histogramDump.p75, 0.1);
+ assertEquals(0.90, histogramDump.p90, 0.1);
+ assertEquals(0.95, histogramDump.p95, 0.1);
+ assertEquals(0.98, histogramDump.p98, 0.1);
+ assertEquals(0.99, histogramDump.p99, 0.1);
+ assertEquals(0.999, histogramDump.p999, 0.1);
+ assertEquals(4, histogramDump.mean, 0.1);
+ assertEquals(5, histogramDump.stddev, 0.1);
+ assertEquals(6, histogramDump.max);
+ assertEquals(7, histogramDump.min);
+
+ assertTrue(histogramDump.scopeInfo instanceof QueryScopeInfo.OperatorQueryScopeInfo);
+ QueryScopeInfo.OperatorQueryScopeInfo opInfo = (QueryScopeInfo.OperatorQueryScopeInfo) histogramDump.scopeInfo;
+ assertEquals("E", opInfo.scope);
+ assertEquals("jid", opInfo.jobID);
+ assertEquals("vid", opInfo.vertexID);
+ assertEquals(2, opInfo.subtaskIndex);
+ assertEquals("opname", opInfo.operatorName);
+ histograms.remove(h1);
+ break;
+ case METRIC_CATEGORY_METER:
+ MetricDump.MeterDump meterDump = (MetricDump.MeterDump) metric;
+ assertEquals(5.0, meterDump.rate, 0.1);
+
+ assertTrue(meterDump.scopeInfo instanceof QueryScopeInfo.JobQueryScopeInfo);
+ assertEquals("C", meterDump.scopeInfo.scope);
+ assertEquals("c3", meterDump.name);
+ assertEquals("jid", ((QueryScopeInfo.JobQueryScopeInfo) meterDump.scopeInfo).jobID);
+ break;
+ default:
+ fail();
+ }
+ }
+ assertTrue(counters.isEmpty());
+ assertTrue(gauges.isEmpty());
+ assertTrue(histograms.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
new file mode 100644
index 0000000..3b65184
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.junit.Test;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static org.junit.Assert.assertEquals;
+
+public class MetricDumpTest {
+ @Test
+ public void testDumpedCounter() {
+ QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
+
+ MetricDump.CounterDump cd = new MetricDump.CounterDump(info, "counter", 4);
+
+ assertEquals("counter", cd.name);
+ assertEquals(4, cd.count);
+ assertEquals(info, cd.scopeInfo);
+ assertEquals(METRIC_CATEGORY_COUNTER, cd.getCategory());
+ }
+
+ @Test
+ public void testDumpedGauge() {
+ QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
+
+ MetricDump.GaugeDump gd = new MetricDump.GaugeDump(info, "gauge", "hello");
+
+ assertEquals("gauge", gd.name);
+ assertEquals("hello", gd.value);
+ assertEquals(info, gd.scopeInfo);
+ assertEquals(METRIC_CATEGORY_GAUGE, gd.getCategory());
+ }
+
+ @Test
+ public void testDumpedHistogram() {
+ QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
+
+ MetricDump.HistogramDump hd = new MetricDump.HistogramDump(info, "hist", 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+
+ assertEquals("hist", hd.name);
+ assertEquals(1, hd.min);
+ assertEquals(2, hd.max);
+ assertEquals(3, hd.mean, 0.1);
+ assertEquals(4, hd.median, 0.1);
+ assertEquals(5, hd.stddev, 0.1);
+ assertEquals(6, hd.p75, 0.1);
+ assertEquals(7, hd.p90, 0.1);
+ assertEquals(8, hd.p95, 0.1);
+ assertEquals(9, hd.p98, 0.1);
+ assertEquals(10, hd.p99, 0.1);
+ assertEquals(11, hd.p999, 0.1);
+ assertEquals(info, hd.scopeInfo);
+ assertEquals(METRIC_CATEGORY_HISTOGRAM, hd.getCategory());
+ }
+
+ @Test
+ public void testDumpedMeter() {
+ QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
+
+ MetricDump.MeterDump md = new MetricDump.MeterDump(info, "meter", 5.0);
+
+ assertEquals("meter", md.name);
+ assertEquals(5.0, md.rate, 0.1);
+ assertEquals(info, md.scopeInfo);
+ assertEquals(METRIC_CATEGORY_METER, md.getCategory());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
new file mode 100644
index 0000000..91563ec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MetricQueryServiceTest extends TestLogger {
+ @Test
+ public void testCreateDump() throws Exception {
+
+ ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
+ ActorRef serviceActor = MetricQueryService.startMetricQueryService(s);
+ TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
+ TestActor testActor = (TestActor) testActorRef.underlyingActor();
+
+ final Counter c = new SimpleCounter();
+ final Gauge<String> g = new Gauge<String>() {
+ @Override
+ public String getValue() {
+ return "Hello";
+ }
+ };
+ final Histogram h = new TestingHistogram();
+ final Meter m = new Meter() {
+
+ @Override
+ public void markEvent() {
+ }
+
+ @Override
+ public void markEvent(long n) {
+ }
+
+ @Override
+ public double getRate() {
+ return 5;
+ }
+
+ @Override
+ public long getCount() {
+ return 10;
+ }
+ };
+
+ MetricRegistry registry = new MetricRegistry(new Configuration());
+ final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+
+ MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor, g, "gauge", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
+
+ // these metrics will be removed *after* the first query
+ MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
+ MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
+ MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
+ MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
+
+ serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+ synchronized (testActor.lock) {
+ if (testActor.message == null) {
+ testActor.lock.wait();
+ }
+ }
+
+ byte[] dump = (byte[]) testActor.message;
+ testActor.message = null;
+ assertTrue(dump.length > 0);
+
+ serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+ synchronized (testActor.lock) {
+ if (testActor.message == null) {
+ testActor.lock.wait();
+ }
+ }
+
+ byte[] emptyDump = (byte[]) testActor.message;
+ testActor.message = null;
+ assertEquals(16, emptyDump.length);
+ for (int x = 0; x < 16; x++) {
+ assertEquals(0, emptyDump[x]);
+ }
+
+ s.shutdown();
+ }
+
+ private static class TestActor extends UntypedActor {
+ public Object message;
+ public Object lock = new Object();
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ synchronized (lock) {
+ this.message = message;
+ lock.notifyAll();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
new file mode 100644
index 0000000..597e376
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.junit.Test;
+
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+import static org.junit.Assert.assertEquals;
+
+public class QueryScopeInfoTest {
+ @Test
+ public void testJobManagerMetricInfo() {
+ QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
+ assertEquals("abc", info.scope);
+ assertEquals(INFO_CATEGORY_JM, info.getCategory());
+ }
+
+ @Test
+ public void testTaskManagerMetricInfo() {
+ QueryScopeInfo.TaskManagerQueryScopeInfo info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
+ assertEquals("abc", info.scope);
+ assertEquals("tmid", info.taskManagerID);
+ assertEquals(INFO_CATEGORY_TM, info.getCategory());
+ }
+
+ @Test
+ public void testJobMetricInfo() {
+ QueryScopeInfo.JobQueryScopeInfo info = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
+ assertEquals("abc", info.scope);
+ assertEquals("jobid", info.jobID);
+ assertEquals(INFO_CATEGORY_JOB, info.getCategory());
+ }
+
+ @Test
+ public void testTaskMetricInfo() {
+ QueryScopeInfo.TaskQueryScopeInfo info = new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "abc");
+ assertEquals("abc", info.scope);
+ assertEquals("jid", info.jobID);
+ assertEquals("vid", info.vertexID);
+ assertEquals(2, info.subtaskIndex);
+ assertEquals(INFO_CATEGORY_TASK, info.getCategory());
+ }
+
+ @Test
+ public void testOperatorMetricInfo() {
+ QueryScopeInfo.OperatorQueryScopeInfo info = new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "abc");
+ assertEquals("abc", info.scope);
+ assertEquals("jid", info.jobID);
+ assertEquals("vid", info.vertexID);
+ assertEquals("opname", info.operatorName);
+ assertEquals(2, info.subtaskIndex);
+ assertEquals(INFO_CATEGORY_OPERATOR, info.getCategory());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index d9b1ebe..78aac64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
@@ -33,6 +35,10 @@ public class AbstractMetricGroupTest {
MetricRegistry registry = new MetricRegistry(new Configuration());
AbstractMetricGroup group = new AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) {
+ @Override
+ protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+ return null;
+ }
};
assertTrue(group.getAllVariables().isEmpty());
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index faf42ea..1b9b24f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -22,13 +22,16 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class JobManagerGroupTest {
+public class JobManagerGroupTest extends TestLogger {
// ------------------------------------------------------------------------
// adding and removing jobs
@@ -116,4 +119,13 @@ public class JobManagerGroupTest {
registry.shutdown();
}
+
+ @Test
+ public void testCreateQueryServiceMetricInfo() {
+ MetricRegistry registry = new MetricRegistry(new Configuration());
+ JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
+
+ QueryScopeInfo.JobManagerQueryScopeInfo info = jm.createQueryServiceMetricInfo(new DummyCharacterFilter());
+ assertEquals("", info.scope);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index 45f37ac..c3443f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -22,12 +22,15 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-public class JobManagerJobGroupTest {
+public class JobManagerJobGroupTest extends TestLogger {
@Test
public void testGenerateScopeDefault() {
@@ -92,4 +95,16 @@ public class JobManagerJobGroupTest {
registry.shutdown();
}
+
+ @Test
+ public void testCreateQueryServiceMetricInfo() {
+ JobID jid = new JobID();
+ MetricRegistry registry = new MetricRegistry(new Configuration());
+ JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
+ JobManagerJobMetricGroup jmj = new JobManagerJobMetricGroup(registry, jm, jid, "jobname");
+
+ QueryScopeInfo.JobQueryScopeInfo info = jmj.createQueryServiceMetricInfo(new DummyCharacterFilter());
+ assertEquals("", info.scope);
+ assertEquals(jid.toString(), info.jobID);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index a27206d..3fe8d75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -18,19 +18,25 @@
package org.apache.flink.runtime.metrics.groups;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
-public class MetricGroupTest {
+public class MetricGroupTest extends TestLogger {
private MetricRegistry registry;
@@ -110,6 +116,24 @@ public class MetricGroupTest {
assertNotNull(group.addGroup(name));
assertNotNull(group.counter(name));
}
+
+ @Test
+ public void testCreateQueryServiceMetricInfo() {
+ JobID jid = new JobID();
+ AbstractID vid = new AbstractID();
+ AbstractID eid = new AbstractID();
+ MetricRegistry registry = new MetricRegistry(new Configuration());
+ TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+ TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+ TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
+ GenericMetricGroup userGroup = new GenericMetricGroup(registry, task, "hello");
+
+ QueryScopeInfo.TaskQueryScopeInfo info = (QueryScopeInfo.TaskQueryScopeInfo) userGroup.createQueryServiceMetricInfo(new DummyCharacterFilter());
+ assertEquals("hello", info.scope);
+ assertEquals(jid.toString(), info.jobID);
+ assertEquals(vid.toString(), info.vertexID);
+ assertEquals(4, info.subtaskIndex);
+ }
// ------------------------------------------------------------------------
@@ -139,6 +163,11 @@ public class MetricGroupTest {
}
@Override
+ protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+ return null;
+ }
+
+ @Override
protected void addMetric(String name, Metric metric) {}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index c193ac8..2357936 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -21,9 +21,12 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.util.Map;
@@ -31,9 +34,8 @@ import java.util.Map;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-public class OperatorGroupTest {
+public class OperatorGroupTest extends TestLogger {
@Test
public void testGenerateScopeDefault() {
@@ -91,4 +93,23 @@ public class OperatorGroupTest {
assertNotNull(actualValue);
assertEquals(expectedValue, actualValue);
}
+
+ @Test
+ public void testCreateQueryServiceMetricInfo() {
+ JobID jid = new JobID();
+ AbstractID vid = new AbstractID();
+ AbstractID eid = new AbstractID();
+ MetricRegistry registry = new MetricRegistry(new Configuration());
+ TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+ TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+ TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
+ OperatorMetricGroup operator = new OperatorMetricGroup(registry, task, "operator");
+
+ QueryScopeInfo.OperatorQueryScopeInfo info = operator.createQueryServiceMetricInfo(new DummyCharacterFilter());
+ assertEquals("", info.scope);
+ assertEquals(jid.toString(), info.jobID);
+ assertEquals(vid.toString(), info.vertexID);
+ assertEquals(4, info.subtaskIndex);
+ assertEquals("operator", info.operatorName);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index b2c5dc7..a68e59d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -29,9 +29,12 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
@@ -40,7 +43,7 @@ import java.util.ArrayList;
import static org.junit.Assert.*;
-public class TaskManagerGroupTest {
+public class TaskManagerGroupTest extends TestLogger {
// ------------------------------------------------------------------------
// adding and removing jobs
@@ -269,4 +272,14 @@ public class TaskManagerGroupTest {
assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name"));
registry.shutdown();
}
+
+ @Test
+ public void testCreateQueryServiceMetricInfo() {
+ MetricRegistry registry = new MetricRegistry(new Configuration());
+ TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+
+ QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter());
+ assertEquals("", info.scope);
+ assertEquals("id", info.taskManagerID);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index c96af45..175ded1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -23,12 +23,15 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-public class TaskManagerJobGroupTest {
+public class TaskManagerJobGroupTest extends TestLogger {
@Test
public void testGenerateScopeDefault() {
@@ -90,4 +93,16 @@ public class TaskManagerJobGroupTest {
jmGroup.getMetricIdentifier("name"));
registry.shutdown();
}
+
+ @Test
+ public void testCreateQueryServiceMetricInfo() {
+ JobID jid = new JobID();
+ MetricRegistry registry = new MetricRegistry(new Configuration());
+ TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+ TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+
+ QueryScopeInfo.JobQueryScopeInfo info = job.createQueryServiceMetricInfo(new DummyCharacterFilter());
+ assertEquals("", info.scope);
+ assertEquals(jid.toString(), info.jobID);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index da07f8f..c65c1da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -24,15 +24,17 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
-
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class TaskMetricGroupTest {
+public class TaskMetricGroupTest extends TestLogger {
// ------------------------------------------------------------------------
// scope tests
@@ -110,6 +112,23 @@ public class TaskMetricGroupTest {
}
@Test
+ public void testCreateQueryServiceMetricInfo() {
+ JobID jid = new JobID();
+ AbstractID vid = new AbstractID();
+ AbstractID eid = new AbstractID();
+ MetricRegistry registry = new MetricRegistry(new Configuration());
+ TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+ TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+ TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
+
+ QueryScopeInfo.TaskQueryScopeInfo info = task.createQueryServiceMetricInfo(new DummyCharacterFilter());
+ assertEquals("", info.scope);
+ assertEquals(jid.toString(), info.jobID);
+ assertEquals(vid.toString(), info.vertexID);
+ assertEquals(4, info.subtaskIndex);
+ }
+
+ @Test
public void testTaskMetricGroupCleanup() {
CountingMetricRegistry registry = new CountingMetricRegistry(new Configuration());
TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0");
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
new file mode 100644
index 0000000..601f734
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.CharacterFilter;
+
+public class DummyCharacterFilter implements CharacterFilter {
+ @Override
+ public String filterCharacters(String input) {
+ return input;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
new file mode 100644
index 0000000..82f8504
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+public class TestingHistogram implements Histogram {
+
+ @Override
+ public void update(long value) {
+ }
+
+ @Override
+ public long getCount() {
+ return 1;
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new HistogramStatistics() {
+ @Override
+ public double getQuantile(double quantile) {
+ return quantile;
+ }
+
+ @Override
+ public long[] getValues() {
+ return new long[0];
+ }
+
+ @Override
+ public int size() {
+ return 3;
+ }
+
+ @Override
+ public double getMean() {
+ return 4;
+ }
+
+ @Override
+ public double getStdDev() {
+ return 5;
+ }
+
+ @Override
+ public long getMax() {
+ return 6;
+ }
+
+ @Override
+ public long getMin() {
+ return 7;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index bda4174..bc83db9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -138,7 +139,8 @@ public class TaskManagerComponentsStartupShutdownTest {
ioManager,
network,
numberOfSlots,
- leaderRetrievalService);
+ leaderRetrievalService,
+ new MetricRegistry(config));
final ActorRef taskManager = actorSystem.actorOf(tmProps);