You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/08/31 08:05:13 UTC

[flink] 02/02: [FLINK-19055] Wait less time for all memory GC in tests (MemoryManager#verifyEmpty)

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd1ce59ebc53650dbeb42586d1fff95cd1b9d2c4
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Aug 27 17:54:28 2020 +0200

    [FLINK-19055] Wait less time for all memory GC in tests (MemoryManager#verifyEmpty)
    
    The waiting happens in a separate thread in production and does not disrupt anything.
    The problem is that tests use the same method which blocks the testing thread for too long.
    We do not need such big waiting time in tests. The PR refactors code to get back to the smaller waiting time in tests.
    
    This closes #13265.
---
 .../apache/flink/runtime/memory/MemoryManager.java    | 11 ++++++++---
 .../flink/runtime/memory/UnsafeMemoryBudget.java      |  9 ++++++---
 .../flink/runtime/memory/MemoryManagerBuilder.java    |  2 +-
 .../flink/runtime/memory/UnsafeMemoryBudgetTest.java  | 19 ++++++++++++-------
 4 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 799ac9d..b70049e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -89,12 +89,14 @@ public class MemoryManager {
 	 *
 	 * @param memorySize The total size of the off-heap memory to be managed by this memory manager.
 	 * @param pageSize The size of the pages handed out by the memory manager.
+	 * @param verifyEmptyWaitGcMaxSleeps defines how long to wait for GC of all allocated memory to check for memory leaks,
+	 *                                   see {@link UnsafeMemoryBudget} for details.
 	 */
-	MemoryManager(long memorySize, int pageSize) {
+	MemoryManager(long memorySize, int pageSize, int verifyEmptyWaitGcMaxSleeps) {
 		sanityCheck(memorySize, pageSize);
 
 		this.pageSize = pageSize;
-		this.memoryBudget = new UnsafeMemoryBudget(memorySize);
+		this.memoryBudget = new UnsafeMemoryBudget(memorySize, verifyEmptyWaitGcMaxSleeps);
 		this.totalNumberOfPages = memorySize / pageSize;
 		this.allocatedSegments = new ConcurrentHashMap<>();
 		this.reservedMemory = new ConcurrentHashMap<>();
@@ -629,10 +631,13 @@ public class MemoryManager {
 	/**
 	 * Creates a memory manager with the given capacity and given page size.
 	 *
+	 * <p>This is a production version of MemoryManager which waits for longest time
+	 * to check for memory leaks ({@link #verifyEmpty()}) once the owner of the MemoryManager is ready to dispose.
+	 *
 	 * @param memorySize The total size of the off-heap memory to be managed by this memory manager.
 	 * @param pageSize The size of the pages handed out by the memory manager.
 	 */
 	public static MemoryManager create(long memorySize, int pageSize) {
-		return new MemoryManager(memorySize, pageSize);
+		return new MemoryManager(memorySize, pageSize, UnsafeMemoryBudget.MAX_SLEEPS_VERIFY_EMPTY);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
index 96888f6..a7361b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
@@ -36,16 +36,19 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 class UnsafeMemoryBudget {
 	private static final int MAX_SLEEPS = 11; // 2^11 - 1 = (2 x 1024) - 1 ms ~ 2 s total sleep duration
-	private static final int MAX_SLEEPS_VERIFY_EMPTY = 17; // 2^17 - 1 = (128 x 1024) - 1 ms ~ 2 min total sleep duration
+	static final int MAX_SLEEPS_VERIFY_EMPTY = 17; // 2^17 - 1 = (128 x 1024) - 1 ms ~ 2 min total sleep duration
 	private static final int RETRIGGER_GC_AFTER_SLEEPS = 9; // ~ 0.5 sec
 
 	private final long totalMemorySize;
 
 	private final AtomicLong availableMemorySize;
 
-	UnsafeMemoryBudget(long totalMemorySize) {
+	private final int verifyEmptyWaitGcMaxSleeps;
+
+	UnsafeMemoryBudget(long totalMemorySize, int verifyEmptyWaitGcMaxSleeps) {
 		this.totalMemorySize = totalMemorySize;
 		this.availableMemorySize = new AtomicLong(totalMemorySize);
+		this.verifyEmptyWaitGcMaxSleeps = verifyEmptyWaitGcMaxSleeps;
 	}
 
 	long getTotalMemorySize() {
@@ -60,7 +63,7 @@ class UnsafeMemoryBudget {
 		try {
 			// we wait longer than during the normal reserveMemory as we have to GC all memory,
 			// allocated by task, to perform the verification
-			reserveMemory(totalMemorySize, MAX_SLEEPS_VERIFY_EMPTY);
+			reserveMemory(totalMemorySize, verifyEmptyWaitGcMaxSleeps);
 		} catch (MemoryReservationException e) {
 			return false;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
index 91599ac..ba1e862 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
@@ -40,7 +40,7 @@ public class MemoryManagerBuilder {
 	}
 
 	public MemoryManager build() {
-		return new MemoryManager(memorySize, pageSize);
+		return new MemoryManager(memorySize, pageSize, UnsafeMemoryBudgetTest.MAX_SLEEPS_VERIFY_EMPTY_FOR_TESTS);
 	}
 
 	public static MemoryManagerBuilder newBuilder() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/UnsafeMemoryBudgetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/UnsafeMemoryBudgetTest.java
index 4f6edd8..5ee9104 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/UnsafeMemoryBudgetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/UnsafeMemoryBudgetTest.java
@@ -28,29 +28,30 @@ import static org.junit.Assert.assertThat;
 
 /** Test suite for {@link UnsafeMemoryBudget}. */
 public class UnsafeMemoryBudgetTest extends TestLogger {
+	static final int MAX_SLEEPS_VERIFY_EMPTY_FOR_TESTS = 10; // 2^10 - 1 = (1 x 1024) - 1 ms ~ 1 s total sleep duration
 
 	@Test
 	public void testGetTotalMemory() {
-		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		UnsafeMemoryBudget budget = createUnsafeMemoryBudget();
 		assertThat(budget.getTotalMemorySize(), is(100L));
 	}
 
 	@Test
 	public void testReserveMemory() throws MemoryReservationException {
-		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		UnsafeMemoryBudget budget = createUnsafeMemoryBudget();
 		budget.reserveMemory(50L);
 		assertThat(budget.getAvailableMemorySize(), is(50L));
 	}
 
 	@Test(expected = MemoryReservationException.class)
 	public void testReserveMemoryOverLimitFails() throws MemoryReservationException {
-		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		UnsafeMemoryBudget budget = createUnsafeMemoryBudget();
 		budget.reserveMemory(120L);
 	}
 
 	@Test
 	public void testReleaseMemory() throws MemoryReservationException {
-		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		UnsafeMemoryBudget budget = createUnsafeMemoryBudget();
 		budget.reserveMemory(50L);
 		budget.releaseMemory(30L);
 		assertThat(budget.getAvailableMemorySize(), is(80L));
@@ -58,14 +59,14 @@ public class UnsafeMemoryBudgetTest extends TestLogger {
 
 	@Test(expected = IllegalStateException.class)
 	public void testReleaseMemoryMoreThanReservedFails() throws MemoryReservationException {
-		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		UnsafeMemoryBudget budget = createUnsafeMemoryBudget();
 		budget.reserveMemory(50L);
 		budget.releaseMemory(70L);
 	}
 
 	@Test(expected = MemoryReservationException.class)
 	public void testReservationFailsIfOwnerNotGced() throws MemoryReservationException {
-		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		UnsafeMemoryBudget budget = createUnsafeMemoryBudget();
 		Object memoryOwner = new Object();
 		budget.reserveMemory(50L);
 		JavaGcCleanerWrapper.createCleaner(memoryOwner, () -> budget.releaseMemory(50L));
@@ -76,10 +77,14 @@ public class UnsafeMemoryBudgetTest extends TestLogger {
 
 	@Test
 	public void testReservationSuccessIfOwnerGced() throws MemoryReservationException {
-		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		UnsafeMemoryBudget budget = createUnsafeMemoryBudget();
 		budget.reserveMemory(50L);
 		JavaGcCleanerWrapper.createCleaner(new Object(), () -> budget.releaseMemory(50L));
 		budget.reserveMemory(60L);
 		assertThat(budget.getAvailableMemorySize(), is(40L));
 	}
+
+	private static UnsafeMemoryBudget createUnsafeMemoryBudget() {
+		return new UnsafeMemoryBudget(100L, MAX_SLEEPS_VERIFY_EMPTY_FOR_TESTS);
+	}
 }