You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/13 08:20:38 UTC

[flink] branch release-1.10 updated: [FLINK-18646] Verify memory manager empty in a separate thread with larger timeout

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 3e17833  [FLINK-18646] Verify memory manager empty in a separate thread with larger timeout
3e17833 is described below

commit 3e17833eec5f5062260eaef2cc06421dabbd7f56
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Tue Jul 21 19:17:40 2020 +0300

    [FLINK-18646] Verify memory manager empty in a separate thread with larger timeout
    
    UnsafeMemoryBudget#verifyEmpty, called on slot freeing, needs time to wait on GC of all allocated/released managed memory.
    If there are a lot of segments to GC then it can take time to finish the check. If slot freeing happens in RPC thread,
    the GC waiting can block it and TM risks to miss its heartbeat.
    
    Another problem is that after UnsafeMemoryBudget#RETRIGGER_GC_AFTER_SLEEPS, System.gc() is called for each attempt to run a cleaner
    even if there are already detected cleaners to run. This leads to triggering a lot of unnecessary GCs in background.
    
    The PR offloads the verification into a separate thread and calls System.gc() only if memory cannot be reserved and
    there are still no cleaners to run after long waiting. The timeout for normal memory reservation is increased to 2 second.
    The full reservation, used for verification, gets 2 minute timeout.
    
    This closes #12980.
---
 .../flink/runtime/memory/UnsafeMemoryBudget.java   | 41 +++++++++++++++-------
 .../runtime/taskexecutor/TaskManagerServices.java  |  9 +++--
 .../flink/runtime/taskexecutor/slot/TaskSlot.java  | 34 +++++++++++-------
 .../taskexecutor/slot/TaskSlotTableImpl.java       | 13 +++++--
 .../flink/runtime/memory/MemoryManagerTest.java    | 10 ++++++
 .../runtime/taskexecutor/TaskExecutorTest.java     | 16 +++++++--
 .../runtime/taskexecutor/slot/TaskSlotTest.java    |  3 +-
 .../runtime/taskexecutor/slot/TaskSlotUtils.java   |  4 ++-
 8 files changed, 97 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
index a85f40e..8063cd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
@@ -35,11 +35,8 @@ import java.util.concurrent.atomic.AtomicLong;
  * continues to process any ready cleaners making {@link #MAX_SLEEPS} attempts before throwing {@link OutOfMemoryError}.
  */
 class UnsafeMemoryBudget {
-	// max. number of sleeps during try-reserving with exponentially
-	// increasing delay before throwing OutOfMemoryError:
-	// 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
-	// which means that MemoryReservationException will be thrown after 1 s of trying
-	private static final int MAX_SLEEPS = 10;
+	private static final int MAX_SLEEPS = 11; // 2^11 - 1 = (2 x 1024) - 1 ms ~ 2 s total sleep duration
+	private static final int MAX_SLEEPS_VERIFY_EMPTY = 17; // 2^17 - 1 = (128 x 1024) - 1 ms ~ 2 min total sleep duration
 	private static final int RETRIGGER_GC_AFTER_SLEEPS = 9; // ~ 0.5 sec
 
 	private final long totalMemorySize;
@@ -61,7 +58,9 @@ class UnsafeMemoryBudget {
 
 	boolean verifyEmpty() {
 		try {
-			reserveMemory(totalMemorySize);
+			// we wait longer than during the normal reserveMemory as we have to GC all memory,
+			// allocated by task, to perform the verification
+			reserveMemory(totalMemorySize, MAX_SLEEPS_VERIFY_EMPTY);
 		} catch (MemoryReservationException e) {
 			return false;
 		}
@@ -74,8 +73,26 @@ class UnsafeMemoryBudget {
 	 *
 	 * <p>Adjusted version of {@link java.nio.Bits#reserveMemory(long, int)} taken from Java 11.
 	 */
-	@SuppressWarnings({"OverlyComplexMethod", "JavadocReference", "NestedTryStatement"})
 	void reserveMemory(long size) throws MemoryReservationException {
+		reserveMemory(size, MAX_SLEEPS);
+	}
+
+	/**
+	 * Reserve memory of certain size if it is available.
+	 *
+	 * <p>If the method cannot reserve immediately, it tries to process the phantom GC cleaners queue by
+	 * calling {@link JavaGcCleanerWrapper#tryRunPendingCleaners()}. If it does not help,
+	 * the method calls {@link System#gc} and tries again to reserve. If it still cannot reserve,
+	 * it tries to process the phantom GC cleaners queue. If there are no cleaners to process,
+	 * the method sleeps the {@code maxSleeps} number of times, starting 1 ms and each time doubling
+	 * the sleeping duration: 1 (0), 2 (1), 4 (2), 8 (3), 16 (4), 32 (5), 64 (6), 128 (7), 256 (8), 512 (9), ...
+	 * After the {@code RETRIGGER_GC_AFTER_SLEEPS} sleeps, the method also calls {@link System#gc} before sleeping.
+	 * After the {@code maxSleeps} being unable to reserve, the {@link MemoryReservationException} is thrown.
+	 *
+	 * <p>Adjusted version of {@link java.nio.Bits#reserveMemory(long, int)} taken from Java 11.
+	 */
+	@SuppressWarnings({"OverlyComplexMethod", "JavadocReference", "NestedTryStatement"})
+	void reserveMemory(long size, int maxSleeps) throws MemoryReservationException {
 		long availableOrReserved = tryReserveMemory(size);
 		// optimist!
 		if (availableOrReserved >= size) {
@@ -122,15 +139,15 @@ class UnsafeMemoryBudget {
 				if (availableOrReserved >= size) {
 					return;
 				}
-				if (sleeps >= MAX_SLEEPS) {
+				if (sleeps >= maxSleeps) {
 					break;
 				}
-				if (sleeps >= RETRIGGER_GC_AFTER_SLEEPS) {
-					// trigger again VM's Reference processing if we have to wait longer
-					System.gc();
-				}
 				try {
 					if (!JavaGcCleanerWrapper.tryRunPendingCleaners()) {
+						if (sleeps >= RETRIGGER_GC_AFTER_SLEEPS) {
+							// trigger again VM's Reference processing if we have to wait longer
+							System.gc();
+						}
 						Thread.sleep(sleepTime);
 						sleepTime <<= 1;
 						sleeps++;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index a443fb4..5a32319 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -247,7 +247,8 @@ public class TaskManagerServices {
 			taskManagerServicesConfiguration.getNumberOfSlots(),
 			taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
 			taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
-			taskManagerServicesConfiguration.getPageSize());
+			taskManagerServicesConfiguration.getPageSize(),
+			ioExecutor);
 
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 
@@ -284,7 +285,8 @@ public class TaskManagerServices {
 			final int numberOfSlots,
 			final TaskExecutorResourceSpec taskExecutorResourceSpec,
 			final long timerServiceShutdownTimeout,
-			final int pageSize) {
+			final int pageSize,
+			final Executor memoryVerificationExecutor) {
 		final TimerService<AllocationID> timerService = new TimerService<>(
 			new ScheduledThreadPoolExecutor(1),
 			timerServiceShutdownTimeout);
@@ -293,7 +295,8 @@ public class TaskManagerServices {
 			TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(taskExecutorResourceSpec),
 			TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(taskExecutorResourceSpec, numberOfSlots),
 			pageSize,
-			timerService);
+			timerService,
+			memoryVerificationExecutor);
 	}
 
 	private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 1dc569f..82fc364 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
 /**
@@ -84,15 +85,22 @@ public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseableAsync {
 	/** The closing future is completed when the slot is freed and closed. */
 	private final CompletableFuture<Void> closingFuture;
 
+	/**
+	 * {@link Executor} for background actions, e.g. verify all managed memory released.
+	 */
+	private final Executor asyncExecutor;
+
 	public TaskSlot(
 		final int index,
 		final ResourceProfile resourceProfile,
 		final int memoryPageSize,
 		final JobID jobId,
-		final AllocationID allocationId) {
+		final AllocationID allocationId,
+		final Executor asyncExecutor) {
 
 		this.index = index;
 		this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+		this.asyncExecutor = Preconditions.checkNotNull(asyncExecutor);
 
 		this.tasks = new HashMap<>(4);
 		this.state = TaskSlotState.ALLOCATED;
@@ -295,22 +303,24 @@ public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseableAsync {
 				// and set the slot state to releasing so that it gets eventually freed
 				tasks.values().forEach(task -> task.failExternally(cause));
 			}
-			final CompletableFuture<Void> cleanupFuture = FutureUtils
-				.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
-				.thenRun(() -> {
-					verifyMemoryFreed();
-					this.memoryManager.shutdown();
-				});
 
-			FutureUtils.forward(cleanupFuture, closingFuture);
+			final CompletableFuture<Void> shutdownFuture = FutureUtils
+				.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
+				.thenRun(memoryManager::shutdown);
+			verifyAllManagedMemoryIsReleasedAfter(shutdownFuture);
+			FutureUtils.forward(shutdownFuture, closingFuture);
 		}
 		return closingFuture;
 	}
 
-	private void verifyMemoryFreed() {
-		if (!memoryManager.verifyEmpty()) {
-			LOG.warn("Not all slot memory is freed, potential memory leak at {}", this);
-		}
+	private void verifyAllManagedMemoryIsReleasedAfter(CompletableFuture<Void> after) {
+		after.thenRunAsync(
+			() -> {
+				if (!memoryManager.verifyEmpty()) {
+					LOG.warn("Not all slot memory is freed, potential memory leak at {}", this);
+				}
+			},
+			asyncExecutor);
 	}
 
 	private static MemoryManager createMemoryManager(ResourceProfile resourceProfile, int pageSize) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
index 10633c3..98d6aa1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
@@ -53,6 +53,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
 /**
@@ -107,12 +108,18 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
 		"TaskSlotTableImpl is not initialized with proper main thread executor, " +
 			"call to TaskSlotTableImpl#start is required");
 
+	/**
+	 * {@link Executor} for background actions, e.g. verify all managed memory released.
+	 */
+	private final Executor memoryVerificationExecutor;
+
 	public TaskSlotTableImpl(
 			final int numberSlots,
 			final ResourceProfile totalAvailableResourceProfile,
 			final ResourceProfile defaultSlotResourceProfile,
 			final int memoryPageSize,
-			final TimerService<AllocationID> timerService) {
+			final TimerService<AllocationID> timerService,
+			final Executor memoryVerificationExecutor) {
 		Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");
 
 		this.numberSlots = numberSlots;
@@ -134,6 +141,8 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
 		slotActions = null;
 		state = State.CREATED;
 		closingFuture = new CompletableFuture<>();
+
+		this.memoryVerificationExecutor = memoryVerificationExecutor;
 	}
 
 	@Override
@@ -289,7 +298,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
 			return false;
 		}
 
-		taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
+		taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId, memoryVerificationExecutor);
 		if (index >= 0) {
 			taskSlots.put(index, taskSlot);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index 5297525..2f2aca2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -364,6 +364,16 @@ public class MemoryManagerTest {
 		memoryManager.computeMemorySize(-0.1);
 	}
 
+	@Test
+	public void testVerifyEmptyCanBeDoneAfterShutdown() throws MemoryAllocationException, MemoryReservationException {
+		memoryManager.release(memoryManager.allocatePages(new Object(), 1));
+		Object owner = new Object();
+		memoryManager.reserveMemory(owner, MemoryManager.DEFAULT_PAGE_SIZE);
+		memoryManager.releaseMemory(owner, MemoryManager.DEFAULT_PAGE_SIZE);
+		memoryManager.shutdown();
+		memoryManager.verifyEmpty();
+	}
+
 	private void testCannotAllocateAnymore(Object owner, int numPages) {
 		try {
 			memoryManager.allocatePages(owner, numPages);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index f0dbbd1..96d178a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -2069,7 +2069,13 @@ public class TaskExecutorTest extends TestLogger {
 		private final OneShotLatch allocateSlotLatch;
 
 		private AllocateSlotNotifyingTaskSlotTable(OneShotLatch allocateSlotLatch) {
-			super(1, createTotalResourceProfile(1), DEFAULT_RESOURCE_PROFILE, MemoryManager.MIN_PAGE_SIZE, createDefaultTimerService(timeout.toMilliseconds()));
+			super(
+				1,
+				createTotalResourceProfile(1),
+				DEFAULT_RESOURCE_PROFILE,
+				MemoryManager.MIN_PAGE_SIZE,
+				createDefaultTimerService(timeout.toMilliseconds()),
+				Executors.newDirectExecutorService());
 			this.allocateSlotLatch = allocateSlotLatch;
 		}
 
@@ -2095,7 +2101,13 @@ public class TaskExecutorTest extends TestLogger {
 		private final CountDownLatch slotsToActivate;
 
 		private ActivateSlotNotifyingTaskSlotTable(int numberOfDefaultSlots, CountDownLatch slotsToActivate) {
-			super(numberOfDefaultSlots, createTotalResourceProfile(numberOfDefaultSlots), DEFAULT_RESOURCE_PROFILE, MemoryManager.MIN_PAGE_SIZE, createDefaultTimerService(timeout.toMilliseconds()));
+			super(
+				numberOfDefaultSlots,
+				createTotalResourceProfile(numberOfDefaultSlots),
+				DEFAULT_RESOURCE_PROFILE,
+				MemoryManager.MIN_PAGE_SIZE,
+				createDefaultTimerService(timeout.toMilliseconds()),
+				Executors.newDirectExecutorService());
 			this.slotsToActivate = slotsToActivate;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java
index e922c9e..b08f1a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor.slot;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.util.TestLogger;
@@ -55,6 +56,6 @@ public class TaskSlotTest extends TestLogger {
 	}
 
 	private static <T extends TaskSlotPayload> TaskSlot<T> createTaskSlot() {
-		return new TaskSlot<>(0, ResourceProfile.ZERO, MemoryManager.MIN_PAGE_SIZE, JOB_ID, ALLOCATION_ID);
+		return new TaskSlot<>(0, ResourceProfile.ZERO, MemoryManager.MIN_PAGE_SIZE, JOB_ID, ALLOCATION_ID, Executors.newDirectExecutorService());
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
index 38d0c97..e0a4ca1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
@@ -59,7 +60,8 @@ public enum TaskSlotUtils {
 			createTotalResourceProfile(numberOfSlots),
 			DEFAULT_RESOURCE_PROFILE,
 			MemoryManager.MIN_PAGE_SIZE,
-			timerService);
+			timerService,
+			Executors.newDirectExecutorService());
 	}
 
 	public static ResourceProfile createTotalResourceProfile(int numberOfSlots) {