You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/18 13:48:53 UTC
flink git commit: [FLINK-7368][metrics] Backport synchronization fix
for MetricStore
Repository: flink
Updated Branches:
refs/heads/release-1.3 80c23de70 -> b896b4b65
[FLINK-7368][metrics] Backport synchronization fix for MetricStore
This closes #4841.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b896b4b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b896b4b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b896b4b6
Branch: refs/heads/release-1.3
Commit: b896b4b65c58ebcdeb5b318960823b81015c3a0c
Parents: 80c23de
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Oct 16 17:17:26 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 18 15:48:00 2017 +0200
----------------------------------------------------------------------
.../handlers/TaskManagersHandler.java | 131 ++++++++++---------
.../webmonitor/metrics/MetricFetcher.java | 17 ++-
.../webmonitor/utils/MutableIOMetrics.java | 21 +--
3 files changed, 91 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a23e983..3782ec9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
@@ -28,15 +27,18 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
import org.apache.flink.util.StringUtils;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
import static java.util.Objects.requireNonNull;
public class TaskManagersHandler extends AbstractJsonRequestHandler {
@@ -109,67 +111,70 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
// only send metrics when only one task manager requests them.
if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
fetcher.update();
- MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
- if (metrics != null) {
- gen.writeObjectFieldStart("metrics");
- long heapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
- long heapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
- long heapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
- gen.writeNumberField("heapCommitted", heapCommitted);
- gen.writeNumberField("heapUsed", heapUsed);
- gen.writeNumberField("heapMax", heapTotal);
-
- long nonHeapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
- long nonHeapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
- long nonHeapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
- gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
- gen.writeNumberField("nonHeapUsed", nonHeapUsed);
- gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
- gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
- gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
- gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
- long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
- long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
- long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
- gen.writeNumberField("directCount", directCount);
- gen.writeNumberField("directUsed", directUsed);
- gen.writeNumberField("directMax", directMax);
-
- long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
- long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
- long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
- gen.writeNumberField("mappedCount", mappedCount);
- gen.writeNumberField("mappedUsed", mappedUsed);
- gen.writeNumberField("mappedMax", mappedMax);
-
- long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
- long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
- gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
- gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
- gen.writeArrayFieldStart("garbageCollectors");
-
- for (String gcName : metrics.garbageCollectorNames) {
- String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
- String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
- if (count != null && time != null) {
- gen.writeStartObject();
- gen.writeStringField("name", gcName);
- gen.writeNumberField("count", Long.valueOf(count));
- gen.writeNumberField("time", Long.valueOf(time));
- gen.writeEndObject();
+ MetricStore metricStore = fetcher.getMetricStore();
+ synchronized (metricStore) {
+ MetricStore.TaskManagerMetricStore metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString());
+ if (metrics != null) {
+ gen.writeObjectFieldStart("metrics");
+ long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+ long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+ long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+ gen.writeNumberField("heapCommitted", heapCommitted);
+ gen.writeNumberField("heapUsed", heapUsed);
+ gen.writeNumberField("heapMax", heapTotal);
+
+ long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+ long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+ long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+ gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+ gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+ gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+ gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+ gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+ gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+ long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+ long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+ long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+ gen.writeNumberField("directCount", directCount);
+ gen.writeNumberField("directUsed", directUsed);
+ gen.writeNumberField("directMax", directMax);
+
+ long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+ long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+ long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+ gen.writeNumberField("mappedCount", mappedCount);
+ gen.writeNumberField("mappedUsed", mappedUsed);
+ gen.writeNumberField("mappedMax", mappedMax);
+
+ long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+ long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+ gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+ gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+ gen.writeArrayFieldStart("garbageCollectors");
+
+ for (String gcName : metrics.garbageCollectorNames) {
+ String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+ String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+ if (count != null && time != null) {
+ gen.writeStartObject();
+ gen.writeStringField("name", gcName);
+ gen.writeNumberField("count", Long.valueOf(count));
+ gen.writeNumberField("time", Long.valueOf(time));
+ gen.writeEndObject();
+ }
}
- }
- gen.writeEndArray();
- gen.writeEndObject();
+ gen.writeEndArray();
+ gen.writeEndObject();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 4f92148..667cc22 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -30,23 +30,24 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import scala.Option;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
/**
@@ -194,8 +195,10 @@ public class MetricFetcher {
private void addMetrics(Object result) {
MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
- for (MetricDump metric : dumpedMetrics) {
- metrics.add(metric);
+ synchronized (metrics) {
+ for (MetricDump metric : dumpedMetrics) {
+ metrics.add(metric);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
index 32cda7f..e2d9bef 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
@@ -17,7 +17,6 @@
*/
package org.apache.flink.runtime.webmonitor.utils;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.IOMetrics;
@@ -26,7 +25,10 @@ import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import javax.annotation.Nullable;
+
import java.io.IOException;
/**
@@ -69,13 +71,16 @@ public class MutableIOMetrics extends IOMetrics {
} else { // execAttempt is still running, use MetricQueryService instead
if (fetcher != null) {
fetcher.update();
- MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
- if (metrics != null) {
- this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
- this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
- this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
- this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
- this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+ MetricStore metricStore = fetcher.getMetricStore();
+ synchronized (metricStore) {
+ MetricStore.SubtaskMetricStore metrics = metricStore.getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
+ if (metrics != null) {
+ this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
+ this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+ this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+ this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+ this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+ }
}
}
}