You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/14 18:20:37 UTC

[GitHub] [flink] rmetzger commented on a change in pull request #13547: [FLINK-14406][runtime] Exposes managed memory usage through the REST API

rmetzger commented on a change in pull request #13547:
URL: https://github.com/apache/flink/pull/13547#discussion_r504815681



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -1773,6 +1773,25 @@ public ResourceID getResourceID() {
 		return unresolvedTaskManagerLocation.getResourceID();
 	}
 
+	public long getUsedManagedMemory() {
+		Set<AllocationID> activeTaskAllocationIds = taskSlotTable.getActiveTaskSlotAllocationIds();
+
+		long usedMemory = 0L;
+		for (AllocationID allocationID : activeTaskAllocationIds) {
+			try {
+				usedMemory += taskSlotTable.getTaskMemoryManager(allocationID).usedMemory();
+			} catch (SlotNotFoundException e) {
+				log.debug("The task slot {} is not present anymore and will be ignore in calculating the amount of used memory.", e.getSlotID());

Review comment:
       ```suggestion
   				log.debug("The task slot {} is not present anymore and will be ignored in calculating the amount of used memory.", e.getSlotID());
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
##########
@@ -132,6 +136,17 @@ public static void instantiateStatusMetrics(
 		instantiateCPUMetrics(jvm.addGroup("CPU"));
 	}
 
+	public static void instantiateManagedMemoryMetrics(
+			MetricGroup metricGroup,
+			TaskExecutor taskExecutor) {
+		checkNotNull(metricGroup);
+		checkNotNull(taskExecutor);
+
+		MetricGroup managedMemoryMetricGroup = metricGroup.addGroup(METRIC_GROUP_MANAGED_MEMORY);
+		managedMemoryMetricGroup.gauge(MetricNames.MEMORY_USED, taskExecutor::getUsedManagedMemory);
+		managedMemoryMetricGroup.gauge(MetricNames.MEMORY_MAX, taskExecutor::getTotalManagedMemory);

Review comment:
       I'm not sure if "Max" is a good word here. It implies "Maximum observed value" (in the context of the GC metrics)? Can't we use "Total"?

##########
File path: docs/monitoring/metrics.md
##########
@@ -919,7 +919,18 @@ Thus, in order to infer the metric identifier:
       <td>Mapped.TotalCapacity</td>
       <td>The number of buffers in the mapped buffer pool (in bytes).</td>
       <td>Gauge</td>
-    </tr>                                                         
+    </tr>
+    <tr>
+      <td rowspan="2">Status.ManagedMemory</td>
+      <td>Used</td>
+      <td>The amount of managed memory currently used.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>Total</td>
+      <td>The total amount of managed memory.</td>
+      <td>Gauge</td>
+    </tr>

Review comment:
       It seems that `Status.ManagedMemory.Max` is not documented? (maybe I'm misreading something here)

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -1773,6 +1773,25 @@ public ResourceID getResourceID() {
 		return unresolvedTaskManagerLocation.getResourceID();
 	}
 
+	public long getUsedManagedMemory() {
+		Set<AllocationID> activeTaskAllocationIds = taskSlotTable.getActiveTaskSlotAllocationIds();
+
+		long usedMemory = 0L;
+		for (AllocationID allocationID : activeTaskAllocationIds) {
+			try {
+				usedMemory += taskSlotTable.getTaskMemoryManager(allocationID).usedMemory();
+			} catch (SlotNotFoundException e) {
+				log.debug("The task slot {} is not present anymore and will be ignore in calculating the amount of used memory.", e.getSlotID());

Review comment:
       I'm not sure if the changes to the `SlotNotFoundException` are needed at all. We have access to the same `allocationID` in the catch clause as well.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
##########
@@ -246,10 +256,11 @@ default int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
 	/**
 	 * Return an iterator over all tasks for a given job.
 	 *
-	 * @param jobId identifying the job of the requested tasks
+	 * @param jobId identifying the job of the requested tasks. If {@code null} is passed,
+	 * all tasks held by this {@code TaskSlotTable} are returned.
 	 * @return Iterator over all task for a given job
 	 */
-	Iterator<T> getTasks(JobID jobId);
+	Iterator<T> getTasks(@Nullable JobID jobId);

Review comment:
       Why is this change necessary?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
##########
@@ -199,10 +208,11 @@ default int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
 	/**
 	 * Return an iterator of allocated slots for the given job id.
 	 *
-	 * @param jobId for which to return the allocated slots
+	 * @param jobId for which to return the allocated slots. If {@code null} is passed,
+	 * all allocated slots held by this {@code TaskSlotTable} are returned.
 	 * @return Iterator of allocated slots.
 	 */
-	Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId);
+	Iterator<TaskSlot<T>> getAllocatedSlots(@Nullable JobID jobId);

Review comment:
       Why is this change necessary?
   
   

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
##########
@@ -27,11 +27,14 @@
 
 	private static final long serialVersionUID = -883614807750137925L;
 
+	private final AllocationID allocationID;
+
 	public SlotNotFoundException(AllocationID allocationId) {
-		this("Could not find slot for " + allocationId + '.');
+		super("Could not find slot for " + allocationId + '.');
+		this.allocationID = allocationId;
 	}
 
-	public SlotNotFoundException(String message) {
-		super(message);
+	public AllocationID getSlotID() {

Review comment:
       Why isn't this called `getAllocationID` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org