You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/12 14:35:08 UTC

flink git commit: [FLINK-7818] Synchronize MetricStore access in TaskManagersHandler

Repository: flink
Updated Branches:
  refs/heads/master 4f8d01fba -> 742e4a0ff


[FLINK-7818] Synchronize MetricStore access in TaskManagersHandler

This closes #4811.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/742e4a0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/742e4a0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/742e4a0f

Branch: refs/heads/master
Commit: 742e4a0ffd2fb244654e97098d0b23100789d4e9
Parents: 4f8d01f
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 11 19:00:16 2017 +0200
Committer: Till <ti...@gmail.com>
Committed: Thu Oct 12 16:34:37 2017 +0200

----------------------------------------------------------------------
 .../handler/legacy/TaskManagersHandler.java     | 122 ++++++++++---------
 1 file changed, 63 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/742e4a0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index 0880d0c..e608b99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -130,67 +130,71 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 			// only send metrics when only one task manager requests them.
 			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
 				fetcher.update();
-				MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
-				if (metrics != null) {
-					gen.writeObjectFieldStart("metrics");
-					long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-					long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-					long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-					gen.writeNumberField("heapCommitted", heapCommitted);
-					gen.writeNumberField("heapUsed", heapUsed);
-					gen.writeNumberField("heapMax", heapTotal);
-
-					long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-					long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-					long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-					gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-					gen.writeNumberField("nonHeapUsed", nonHeapUsed);
-					gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
-					gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
-					gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
-					gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
-					long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-					long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-					long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-					gen.writeNumberField("directCount", directCount);
-					gen.writeNumberField("directUsed", directUsed);
-					gen.writeNumberField("directMax", directMax);
-
-					long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-					long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-					long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-					gen.writeNumberField("mappedCount", mappedCount);
-					gen.writeNumberField("mappedUsed", mappedUsed);
-					gen.writeNumberField("mappedMax", mappedMax);
-
-					long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-					long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-					gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-					gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-					gen.writeArrayFieldStart("garbageCollectors");
-
-					for (String gcName : metrics.garbageCollectorNames) {
-						String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-						String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-						if (count != null  && time != null) {
-							gen.writeStartObject();
-							gen.writeStringField("name", gcName);
-							gen.writeNumberField("count", Long.valueOf(count));
-							gen.writeNumberField("time", Long.valueOf(time));
-							gen.writeEndObject();
+				final MetricStore metricStore = fetcher.getMetricStore();
+
+				synchronized (metricStore) {
+					MetricStore.TaskManagerMetricStore metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString());
+					if (metrics != null) {
+						gen.writeObjectFieldStart("metrics");
+						long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+						long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+						long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+						gen.writeNumberField("heapCommitted", heapCommitted);
+						gen.writeNumberField("heapUsed", heapUsed);
+						gen.writeNumberField("heapMax", heapTotal);
+
+						long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+						long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+						long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+						gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+						gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+						gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+						gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+						gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+						gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+						long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+						long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+						long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+						gen.writeNumberField("directCount", directCount);
+						gen.writeNumberField("directUsed", directUsed);
+						gen.writeNumberField("directMax", directMax);
+
+						long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+						long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+						long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+						gen.writeNumberField("mappedCount", mappedCount);
+						gen.writeNumberField("mappedUsed", mappedUsed);
+						gen.writeNumberField("mappedMax", mappedMax);
+
+						long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+						long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+						gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+						gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+						gen.writeArrayFieldStart("garbageCollectors");
+
+						for (String gcName : metrics.garbageCollectorNames) {
+							String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+							String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+							if (count != null && time != null) {
+								gen.writeStartObject();
+								gen.writeStringField("name", gcName);
+								gen.writeNumberField("count", Long.valueOf(count));
+								gen.writeNumberField("time", Long.valueOf(time));
+								gen.writeEndObject();
+							}
 						}
-					}
 
-					gen.writeEndArray();
-					gen.writeEndObject();
+						gen.writeEndArray();
+						gen.writeEndObject();
+					}
 				}
 			}