You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/16 12:20:57 UTC

[GitHub] [flink] azagrebin commented on a change in pull request #13547: [FLINK-14406][runtime] Exposes managed memory usage through the REST API

azagrebin commented on a change in pull request #13547:
URL: https://github.com/apache/flink/pull/13547#discussion_r501523557



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
##########
@@ -598,6 +598,15 @@ public long availableMemory() {
 		return memoryBudget.getAvailableMemorySize();
 	}
 
+	/**
+	 * Returns the used amount of memory handled by this memory manager.
+	 *
+	 * @return The used amount of memory.
+	 */
+	public long usedMemory() {
+		return memoryBudget.getUsedMemorySize();

Review comment:
       This could be already `getMemorySize() - availableMemory()` w/o touching `UnsafeMemoryBudget`.
   Moreover, not sure about general purpose of this method, maybe we could just calculate `getMemorySize() - availableMemory()` where we query metrics.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
##########
@@ -34,15 +34,16 @@
 import org.apache.flink.runtime.metrics.ReporterSetup;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
-import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;

Review comment:
       I think this kind of commits should be rather called `[hotfix]`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -1776,6 +1776,25 @@ public ResourceID getResourceID() {
 		return unresolvedTaskManagerLocation.getResourceID();
 	}
 
+	public long getUsedManagedMemory() {

Review comment:
       I am not sure about the general purpose of this method.
   Could we have it in `MetricUtils`?
   `taskExecutorServices.getTaskSlotTable()/getManagedMemorySize ()` are already available in `TaskManagerRunner#startTaskManager` to use it in `instantiateManagedMemoryMetrics` instead of `TaskExecutor` which already has a lot of responsibilities. Then I think we should pass `TaskSlotTable` and `managedMemorySize` to `instantiateManagedMemoryMetrics` to simplify testing.

##########
File path: docs/monitoring/metrics.md
##########
@@ -919,7 +919,18 @@ Thus, in order to infer the metric identifier:
       <td>Mapped.TotalCapacity</td>
       <td>The number of buffers in the mapped buffer pool (in bytes).</td>
       <td>Gauge</td>
-    </tr>                                                         
+    </tr>
+    <tr>
+      <td rowspan="2">Status.ManagedMemory</td>

Review comment:
       pity we did not call `Status.JVM.Memory` as `Status.Memory.JVM`.
   The managed could be `Status.Memory.Managed`, maybe we should do it for managed like this to establish such convention. Later, JVM might be also refactored.
   @zentol any more opinion about the namings?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
##########
@@ -81,7 +80,7 @@ public static ProcessMetricGroup instantiateProcessMetricGroup(
 			final Optional<Time> systemResourceProbeInterval) {
 		final ProcessMetricGroup processMetricGroup = ProcessMetricGroup.create(metricRegistry, hostname);
 
-		createAndInitializeStatusMetricGroup(processMetricGroup);
+		instantiateStatusMetricGroup(processMetricGroup);

Review comment:
       also not sure the renaming is needed in a sense of preserving the git history vs minor changes, same for `getActiveTaskAllocationIdsPerJob`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
##########
@@ -199,10 +208,11 @@ default int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
 	/**
 	 * Return an iterator of allocated slots for the given job id.
 	 *
-	 * @param jobId for which to return the allocated slots
+	 * @param jobId for which to return the allocated slots. If {@code null} is passed,
+	 * all allocated slots held by this {@code TaskSlotTable} are returned.
 	 * @return Iterator of allocated slots.
 	 */
-	Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId);
+	Iterator<TaskSlot<T>> getAllocatedSlots(@Nullable JobID jobId);

Review comment:
       I somewhat liked more the previous version which had separate methods with separate simpler responsibilities but maybe it is a bit of personal taste.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
##########
@@ -81,7 +80,7 @@ public static ProcessMetricGroup instantiateProcessMetricGroup(
 			final Optional<Time> systemResourceProbeInterval) {
 		final ProcessMetricGroup processMetricGroup = ProcessMetricGroup.create(metricRegistry, hostname);
 
-		createAndInitializeStatusMetricGroup(processMetricGroup);
+		instantiateStatusMetricGroup(processMetricGroup);

Review comment:
       I think this kind of commits should be rather called `[hotfix]`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
##########
@@ -159,6 +173,66 @@ public void testStartupWhenNetworkStackFailsToInitialize() throws Exception {
 		}
 	}
 
+	/**
+	 * Checks that all expected metrics are initialized.
+	 */
+	@Test
+	public void testMetricInitialization() throws Exception {
+		Configuration cfg = createFlinkConfiguration();
+
+		List<String> registeredMetrics = new ArrayList<>();
+		startTaskManager(
+			cfg,
+			rpcService,
+			highAvailabilityServices,
+			TestingMetricRegistry.builder()
+				.setRegisterConsumer((metric, metricName, group) -> registeredMetrics.add(group.getMetricIdentifier(metricName)))
+				.setScopeFormats(ScopeFormats.fromConfig(cfg))
+				.build());
+
+		Set<String> expectedTaskManagerMetrics = Sets.newHashSet(
+			"Status.JVM.ClassLoader.ClassesLoaded",
+			"Status.JVM.ClassLoader.ClassesUnloaded",
+			"Status.JVM.GarbageCollector.scavenge.Count",
+			"Status.JVM.GarbageCollector.scavenge.Time",
+			"Status.JVM.GarbageCollector.global.Count",
+			"Status.JVM.GarbageCollector.global.Time",
+			"Status.JVM.Memory.Heap.Used",
+			"Status.JVM.Memory.Heap.Committed",
+			"Status.JVM.Memory.Heap.Max",
+			"Status.JVM.Memory.NonHeap.Used",
+			"Status.JVM.Memory.NonHeap.Committed",
+			"Status.JVM.Memory.NonHeap.Max",
+			"Status.JVM.Memory.Direct.Count",
+			"Status.JVM.Memory.Direct.MemoryUsed",
+			"Status.JVM.Memory.Direct.TotalCapacity",
+			"Status.JVM.Memory.Mapped.Count",
+			"Status.JVM.Memory.Mapped.MemoryUsed",
+			"Status.JVM.Memory.Mapped.TotalCapacity",
+			"Status.JVM.Threads.Count",
+			"Status.JVM.CPU.Load",
+			"Status.JVM.CPU.Time",
+			"Status.Network.TotalMemorySegments",
+			"Status.Network.AvailableMemorySegments",
+			"Status.Shuffle.Netty.TotalMemorySegments",
+			"Status.Shuffle.Netty.TotalMemory",
+			"Status.Shuffle.Netty.AvailableMemorySegments",
+			"Status.Shuffle.Netty.AvailableMemory",
+			"Status.Shuffle.Netty.UsedMemorySegments",
+			"Status.Shuffle.Netty.UsedMemory",
+			"Status.ManagedMemory.Used",
+			"Status.ManagedMemory.Max"
+		);
+
+		assertThat(registeredMetrics.size(), is(expectedTaskManagerMetrics.size()));
+		registeredMetrics.forEach(metric -> {
+			assertThat(metric, startsWith(".taskmanager."));
+			String metricSuffix = metric.replaceAll("\\.taskmanager\\.[^.]+\\.", "");

Review comment:
       why not to include `".taskmanager."` into `expectedTaskManagerMetrics` and do:
   `assertThat(registeredMetrics, containsInAnyOrder(expectedTaskManagerMetrics));`
   (`expectedTaskManagerMetrics` has to be an array then `String[]`)?
   
   There is also `assertThat(s2, everyItem(isIn(s1)));` for checking a subset.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org