You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/24 16:40:16 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #12980: [FLINK-18646] Verify memory manager empty in a separate thread with l…

tillrohrmann commented on a change in pull request #12980:
URL: https://github.com/apache/flink/pull/12980#discussion_r460156112



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -84,15 +85,22 @@
 	/** The closing future is completed when the slot is freed and closed. */
 	private final CompletableFuture<Void> closingFuture;
 
+	/**
+	 * {@link ExecutorService} for background actions, e.g. verify all managed memory released.
+	 */
+	private final ExecutorService asyncExecutor;

Review comment:
       ```suggestion
   	private final Executor asyncExecutor;
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
 class UnsafeMemoryBudget {
 	// max. number of sleeps during try-reserving with exponentially
 	// increasing delay before throwing OutOfMemoryError:
-	// 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+	// 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms ~ 2 s)
 	// which means that MemoryReservationException will be thrown after 1 s of trying

Review comment:
       ```suggestion
   	// which means that MemoryReservationException will be thrown after 2 s of trying
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -84,15 +85,22 @@
 	/** The closing future is completed when the slot is freed and closed. */
 	private final CompletableFuture<Void> closingFuture;
 
+	/**
+	 * {@link ExecutorService} for background actions, e.g. verify all managed memory released.

Review comment:
       ```suggestion
   	 * {@link Executor} for background actions, e.g. verify all managed memory released.
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
 		slotActions = null;
 		state = State.CREATED;
 		closingFuture = new CompletableFuture<>();
+
+		asyncExecutor = new ThreadPoolExecutor(
+			0,
+			numberSlots,
+			60L, TimeUnit.SECONDS,
+			new SynchronousQueue<>());

Review comment:
       ```suggestion
   			new SynchronousQueue<>(),
   			new ExecutorThreadFactory("task-slot-memory-verifier"));
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -295,22 +303,25 @@ public String toString() {
 				// and set the slot state to releasing so that it gets eventually freed
 				tasks.values().forEach(task -> task.failExternally(cause));
 			}
+
 			final CompletableFuture<Void> cleanupFuture = FutureUtils
 				.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
-				.thenRun(() -> {
-					verifyMemoryFreed();
-					this.memoryManager.shutdown();
-				});
-
+				.thenRun(memoryManager::shutdown);
+			verifyAllManagedMemoryIsReleasedAfter(cleanupFuture);

Review comment:
       Maybe add a test which ensures that we can call `MemoryManager.verifyEmpty()` after it has been shut down. That way we explicitly state that this an explicit contract we want to guard.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
 		slotActions = null;
 		state = State.CREATED;
 		closingFuture = new CompletableFuture<>();
+
+		asyncExecutor = new ThreadPoolExecutor(

Review comment:
       What spoke against using the `ioExecutor` in `TaskManagerSharedServices` when creating the `TaskSlotTableImpl`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -295,22 +303,25 @@ public String toString() {
 				// and set the slot state to releasing so that it gets eventually freed
 				tasks.values().forEach(task -> task.failExternally(cause));
 			}
+
 			final CompletableFuture<Void> cleanupFuture = FutureUtils
 				.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
-				.thenRun(() -> {
-					verifyMemoryFreed();
-					this.memoryManager.shutdown();
-				});
-
+				.thenRun(memoryManager::shutdown);
+			verifyAllManagedMemoryIsReleasedAfter(cleanupFuture);
 			FutureUtils.forward(cleanupFuture, closingFuture);
 		}
 		return closingFuture;
 	}
 
-	private void verifyMemoryFreed() {
-		if (!memoryManager.verifyEmpty()) {
-			LOG.warn("Not all slot memory is freed, potential memory leak at {}", this);
-		}
+	private void verifyAllManagedMemoryIsReleasedAfter(CompletableFuture<Void> after) {
+		FutureUtils.runAfterwardsAsync(
+			after,
+			() -> {
+				if (!memoryManager.verifyEmpty()) {
+					LOG.warn("Not all slot memory is freed, potential memory leak at {}", this);
+				}
+			},
+			asyncExecutor);

Review comment:
       Does it make sense to only run this action if `after` has completed normally. E.g. using `after.thenRunAsync(() -> ..., asyncExecutor)`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -84,15 +85,22 @@
 	/** The closing future is completed when the slot is freed and closed. */
 	private final CompletableFuture<Void> closingFuture;
 
+	/**
+	 * {@link ExecutorService} for background actions, e.g. verify all managed memory released.
+	 */
+	private final ExecutorService asyncExecutor;

Review comment:
       ```suggestion
   	private final ExecutorService memoryVerificationExecutor;
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -107,6 +111,11 @@
 		"TaskSlotTableImpl is not initialized with proper main thread executor, " +
 			"call to TaskSlotTableImpl#start is required");
 
+	/**
+	 * {@link ExecutorService} for background actions, e.g. verify all managed memory released.
+	 */
+	private final ExecutorService asyncExecutor;

Review comment:
       ```suggestion
   	private final ExecutorService memoryVerificationExecutor;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org