You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/12 14:35:08 UTC
flink git commit: [FLINK-7818] Synchronize MetricStore access in
TaskManagersHandler
Repository: flink
Updated Branches:
refs/heads/master 4f8d01fba -> 742e4a0ff
[FLINK-7818] Synchronize MetricStore access in TaskManagersHandler
This closes #4811.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/742e4a0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/742e4a0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/742e4a0f
Branch: refs/heads/master
Commit: 742e4a0ffd2fb244654e97098d0b23100789d4e9
Parents: 4f8d01f
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 11 19:00:16 2017 +0200
Committer: Till <ti...@gmail.com>
Committed: Thu Oct 12 16:34:37 2017 +0200
----------------------------------------------------------------------
.../handler/legacy/TaskManagersHandler.java | 122 ++++++++++---------
1 file changed, 63 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/742e4a0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index 0880d0c..e608b99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -130,67 +130,71 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
// only send metrics when only one task manager requests them.
if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
fetcher.update();
- MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
- if (metrics != null) {
- gen.writeObjectFieldStart("metrics");
- long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
- long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
- long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
- gen.writeNumberField("heapCommitted", heapCommitted);
- gen.writeNumberField("heapUsed", heapUsed);
- gen.writeNumberField("heapMax", heapTotal);
-
- long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
- long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
- long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
- gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
- gen.writeNumberField("nonHeapUsed", nonHeapUsed);
- gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
- gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
- gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
- gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
- long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
- long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
- long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
- gen.writeNumberField("directCount", directCount);
- gen.writeNumberField("directUsed", directUsed);
- gen.writeNumberField("directMax", directMax);
-
- long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
- long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
- long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
- gen.writeNumberField("mappedCount", mappedCount);
- gen.writeNumberField("mappedUsed", mappedUsed);
- gen.writeNumberField("mappedMax", mappedMax);
-
- long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
- long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
- gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
- gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
- gen.writeArrayFieldStart("garbageCollectors");
-
- for (String gcName : metrics.garbageCollectorNames) {
- String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
- String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
- if (count != null && time != null) {
- gen.writeStartObject();
- gen.writeStringField("name", gcName);
- gen.writeNumberField("count", Long.valueOf(count));
- gen.writeNumberField("time", Long.valueOf(time));
- gen.writeEndObject();
+ final MetricStore metricStore = fetcher.getMetricStore();
+
+ synchronized (metricStore) {
+ MetricStore.TaskManagerMetricStore metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString());
+ if (metrics != null) {
+ gen.writeObjectFieldStart("metrics");
+ long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+ long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+ long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+ gen.writeNumberField("heapCommitted", heapCommitted);
+ gen.writeNumberField("heapUsed", heapUsed);
+ gen.writeNumberField("heapMax", heapTotal);
+
+ long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+ long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+ long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+ gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+ gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+ gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+ gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+ gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+ gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+ long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+ long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+ long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+ gen.writeNumberField("directCount", directCount);
+ gen.writeNumberField("directUsed", directUsed);
+ gen.writeNumberField("directMax", directMax);
+
+ long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+ long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+ long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+ gen.writeNumberField("mappedCount", mappedCount);
+ gen.writeNumberField("mappedUsed", mappedUsed);
+ gen.writeNumberField("mappedMax", mappedMax);
+
+ long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+ long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+ gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+ gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+ gen.writeArrayFieldStart("garbageCollectors");
+
+ for (String gcName : metrics.garbageCollectorNames) {
+ String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+ String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+ if (count != null && time != null) {
+ gen.writeStartObject();
+ gen.writeStringField("name", gcName);
+ gen.writeNumberField("count", Long.valueOf(count));
+ gen.writeNumberField("time", Long.valueOf(time));
+ gen.writeEndObject();
+ }
}
- }
- gen.writeEndArray();
- gen.writeEndObject();
+ gen.writeEndArray();
+ gen.writeEndObject();
+ }
}
}