You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/18 13:48:53 UTC

flink git commit: [FLINK-7368][metrics] Backport synchronization fix for MetricStore

Repository: flink
Updated Branches:
  refs/heads/release-1.3 80c23de70 -> b896b4b65


[FLINK-7368][metrics] Backport synchronization fix for MetricStore

This closes #4841.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b896b4b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b896b4b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b896b4b6

Branch: refs/heads/release-1.3
Commit: b896b4b65c58ebcdeb5b318960823b81015c3a0c
Parents: 80c23de
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Oct 16 17:17:26 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 18 15:48:00 2017 +0200

----------------------------------------------------------------------
 .../handlers/TaskManagersHandler.java           | 131 ++++++++++---------
 .../webmonitor/metrics/MetricFetcher.java       |  17 ++-
 .../webmonitor/utils/MutableIOMetrics.java      |  21 +--
 3 files changed, 91 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a23e983..3782ec9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -28,15 +27,18 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 import org.apache.flink.util.StringUtils;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 import static java.util.Objects.requireNonNull;
 
 public class TaskManagersHandler extends AbstractJsonRequestHandler  {
@@ -109,67 +111,70 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 					// only send metrics when only one task manager requests them.
 					if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
 						fetcher.update();
-						MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
-						if (metrics != null) {
-							gen.writeObjectFieldStart("metrics");
-							long heapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-							long heapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-							long heapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-							gen.writeNumberField("heapCommitted", heapCommitted);
-							gen.writeNumberField("heapUsed", heapUsed);
-							gen.writeNumberField("heapMax", heapTotal);
-
-							long nonHeapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-							long nonHeapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-							long nonHeapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-							gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-							gen.writeNumberField("nonHeapUsed", nonHeapUsed);
-							gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
-							gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
-							gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
-							gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
-							long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-							long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-							long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-							gen.writeNumberField("directCount", directCount);
-							gen.writeNumberField("directUsed", directUsed);
-							gen.writeNumberField("directMax", directMax);
-
-							long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-							long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-							long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-							gen.writeNumberField("mappedCount", mappedCount);
-							gen.writeNumberField("mappedUsed", mappedUsed);
-							gen.writeNumberField("mappedMax", mappedMax);
-
-							long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-							long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-							gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-							gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-							gen.writeArrayFieldStart("garbageCollectors");
-
-							for (String gcName : metrics.garbageCollectorNames) {
-								String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-								String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-								if (count != null  && time != null) {
-									gen.writeStartObject();
-									gen.writeStringField("name", gcName);
-									gen.writeNumberField("count", Long.valueOf(count));
-									gen.writeNumberField("time", Long.valueOf(time));
-									gen.writeEndObject();
+						MetricStore metricStore = fetcher.getMetricStore();
+						synchronized (metricStore) {
+							MetricStore.TaskManagerMetricStore metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString());
+							if (metrics != null) {
+								gen.writeObjectFieldStart("metrics");
+								long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+								long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+								long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+								gen.writeNumberField("heapCommitted", heapCommitted);
+								gen.writeNumberField("heapUsed", heapUsed);
+								gen.writeNumberField("heapMax", heapTotal);
+
+								long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+								long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+								long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+								gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+								gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+								gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+								gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+								gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+								gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+								long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+								long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+								long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+								gen.writeNumberField("directCount", directCount);
+								gen.writeNumberField("directUsed", directUsed);
+								gen.writeNumberField("directMax", directMax);
+
+								long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+								long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+								long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+								gen.writeNumberField("mappedCount", mappedCount);
+								gen.writeNumberField("mappedUsed", mappedUsed);
+								gen.writeNumberField("mappedMax", mappedMax);
+
+								long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+								long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+								gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+								gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+								gen.writeArrayFieldStart("garbageCollectors");
+
+								for (String gcName : metrics.garbageCollectorNames) {
+									String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+									String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+									if (count != null && time != null) {
+										gen.writeStartObject();
+										gen.writeStringField("name", gcName);
+										gen.writeNumberField("count", Long.valueOf(count));
+										gen.writeNumberField("time", Long.valueOf(time));
+										gen.writeEndObject();
+									}
 								}
-							}
 
-							gen.writeEndArray();
-							gen.writeEndObject();
+								gen.writeEndArray();
+								gen.writeEndObject();
+							}
 						}
 					}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 4f92148..667cc22 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -30,23 +30,24 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import scala.Option;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
 
 /**
@@ -194,8 +195,10 @@ public class MetricFetcher {
 	private void addMetrics(Object result) {
 		MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
 		List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
-		for (MetricDump metric : dumpedMetrics) {
-			metrics.add(metric);
+		synchronized (metrics) {
+			for (MetricDump metric : dumpedMetrics) {
+				metrics.add(metric);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
index 32cda7f..e2d9bef 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.runtime.webmonitor.utils;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
@@ -26,7 +25,10 @@ import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
@@ -69,13 +71,16 @@ public class MutableIOMetrics extends IOMetrics {
 		} else { // execAttempt is still running, use MetricQueryService instead
 			if (fetcher != null) {
 				fetcher.update();
-				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
-				if (metrics != null) {
-					this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
-					this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-					this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-					this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-					this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+				MetricStore metricStore = fetcher.getMetricStore();
+				synchronized (metricStore) {
+					MetricStore.SubtaskMetricStore metrics = metricStore.getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
+					if (metrics != null) {
+						this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
+						this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+						this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+						this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+						this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+					}
 				}
 			}
 		}