You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/05/19 07:46:52 UTC

[flink] branch release-1.10 updated (247ab12 -> b8fcdd6)

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 247ab12  [FLINK-16160][table] Fix proctime()/rowtime() doesn't work for TableEnvironment.connect().createTemporaryTable()
     new e70f585  [FLINK-15758][MemManager] Remove KeyedBudgetManager and use AtomicLong
     new 33ba6be  [FLINK-15758][MemManager] Remove MemoryManager#AllocationRequest
     new 2a45209  [FLINK-15758][MemManager] Release segment and its unsafe memory in GC Cleaner
     new b8fcdd6  [hotfix] remove IntelliJ '//noinspection ...' directives from MemoryManager

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/memory/HybridMemorySegment.java     |  27 +-
 .../flink/core/memory/MemorySegmentFactory.java    |  17 +-
 .../org/apache/flink/core/memory/MemoryType.java   |  38 --
 .../org/apache/flink/core/memory/MemoryUtils.java  |   7 +-
 .../apache/flink/util/JavaGcCleanerWrapper.java    | 413 +++++++++++++-------
 .../flink/core/memory/CrossSegmentTypeTest.java    |   2 +-
 .../flink/core/memory/EndiannessAccessChecks.java  |   2 +-
 .../HybridOffHeapUnsafeMemorySegmentTest.java      |   4 +-
 .../flink/core/memory/MemorySegmentChecksTest.java |   4 +-
 .../core/memory/MemorySegmentUndersizedTest.java   |   4 +-
 .../core/memory/OperationsOnFreedSegmentTest.java  |   2 +-
 .../flink/util/JavaGcCleanerWrapperTest.java       |   2 +-
 .../python/AbstractPythonFunctionOperator.java     |   6 +-
 .../apache/flink/runtime/memory/MemoryManager.java | 414 +++++----------------
 .../flink/runtime/memory/UnsafeMemoryBudget.java   | 183 +++++++++
 .../flink/runtime/taskexecutor/slot/TaskSlot.java  |   6 +-
 .../flink/runtime/util/KeyedBudgetManager.java     | 294 ---------------
 .../flink/runtime/memory/MemoryManagerBuilder.java |  19 +-
 .../memory/MemoryManagerSharedResourcesTest.java   |  10 +-
 .../flink/runtime/memory/MemoryManagerTest.java    | 164 ++++----
 .../runtime/memory/UnsafeMemoryBudgetTest.java     |  85 +++++
 .../operators/testutils/MockEnvironment.java       |   3 +-
 .../flink/runtime/util/KeyedBudgetManagerTest.java | 262 -------------
 .../runtime/tasks/StreamMockEnvironment.java       |   3 +-
 24 files changed, 790 insertions(+), 1181 deletions(-)
 delete mode 100644 flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/memory/UnsafeMemoryBudgetTest.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java


[flink] 01/04: [FLINK-15758][MemManager] Remove KeyedBudgetManager and use AtomicLong

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e70f5853895cfa96c665b2f7bec0e62216f7f507
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Jan 30 14:19:39 2020 +0100

    [FLINK-15758][MemManager] Remove KeyedBudgetManager and use AtomicLong
---
 .../org/apache/flink/core/memory/MemoryType.java   |  38 ---
 .../python/AbstractPythonFunctionOperator.java     |   6 +-
 .../apache/flink/runtime/memory/MemoryManager.java | 348 +++++++++------------
 .../flink/runtime/taskexecutor/slot/TaskSlot.java  |   6 +-
 .../flink/runtime/util/KeyedBudgetManager.java     | 294 -----------------
 .../flink/runtime/memory/MemoryManagerBuilder.java |  19 +-
 .../memory/MemoryManagerSharedResourcesTest.java   |  10 +-
 .../flink/runtime/memory/MemoryManagerTest.java    | 112 +++----
 .../operators/testutils/MockEnvironment.java       |   3 +-
 .../flink/runtime/util/KeyedBudgetManagerTest.java | 262 ----------------
 .../runtime/tasks/StreamMockEnvironment.java       |   3 +-
 11 files changed, 190 insertions(+), 911 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java
deleted file mode 100644
index 804f00d..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * The class of memory, such as heap or off-heap.
- */
-@Internal
-public enum MemoryType {
-
-	/**
-	 * Denotes memory that is part of the Java heap.
-	 */
-	HEAP,
-
-	/**
-	 * Denotes memory that is outside the Java heap (but still part of tha Java process).
-	 */
-	OFF_HEAP
-}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 33fb35d..6f94043 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.operators.python;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.python.PythonConfig;
 import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.python.PythonOptions;
@@ -172,8 +171,7 @@ public abstract class AbstractPythonFunctionOperator<IN, OUT>
 				pythonFunctionRunner = null;
 			}
 			if (reservedMemory > 0) {
-				getContainingTask().getEnvironment().getMemoryManager().releaseMemory(
-					this, MemoryType.OFF_HEAP, reservedMemory);
+				getContainingTask().getEnvironment().getMemoryManager().releaseMemory(this, reservedMemory);
 				reservedMemory = -1;
 			}
 		} finally {
@@ -277,7 +275,7 @@ public abstract class AbstractPythonFunctionOperator<IN, OUT>
 		long availableManagedMemory = memoryManager.computeMemorySize(
 			getOperatorConfig().getManagedMemoryFraction());
 		if (requiredPythonWorkerMemory <= availableManagedMemory) {
-			memoryManager.reserveMemory(this, MemoryType.OFF_HEAP, requiredPythonWorkerMemory);
+			memoryManager.reserveMemory(this, requiredPythonWorkerMemory);
 			LOG.info("Reserved memory {} for Python worker.", requiredPythonWorkerMemory);
 			this.reservedMemory = requiredPythonWorkerMemory;
 			// TODO enforce the memory limit of the Python worker
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 3bcfa7c..72df659 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -21,9 +21,6 @@ package org.apache.flink.runtime.memory;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.memory.HybridMemorySegment;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.util.KeyedBudgetManager;
-import org.apache.flink.runtime.util.KeyedBudgetManager.AcquisitionResult;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.LongFunctionWithException;
@@ -32,41 +29,34 @@ import org.apache.flink.util.function.ThrowingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnegative;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.ConcurrentModificationException;
-import java.util.EnumMap;
-import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapUnsafeMemory;
-import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment;
 
 /**
- * The memory manager governs the memory that Flink uses for sorting, hashing, and caching. Memory is represented
- * either in {@link MemorySegment}s of equal size and arbitrary type or in reserved chunks of certain size and {@link MemoryType}.
- * Operators allocate the memory either by requesting a number of memory segments or by reserving chunks.
+ * The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends
+ * (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain
+ * size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks.
  * Any allocated memory has to be released to be reused later.
  *
- * <p>Which {@link MemoryType}s the MemoryManager serves and their total sizes can be passed as an argument
- * to the constructor.
- *
- * <p>The memory segments may be represented as on-heap byte arrays or as off-heap memory regions
- * (both via {@link HybridMemorySegment}). Releasing a memory segment will make it re-claimable
- * by the garbage collector.
+ * <p>The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}).
+ * Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately
+ * releases the underlying memory.
  */
 public class MemoryManager {
 
@@ -83,9 +73,15 @@ public class MemoryManager {
 	private final Map<Object, Set<MemorySegment>> allocatedSegments;
 
 	/** Reserved memory per memory owner. */
-	private final Map<Object, Map<MemoryType, Long>> reservedMemory;
+	private final Map<Object, Long> reservedMemory;
+
+	private final long pageSize;
+
+	private final long totalMemorySize;
+
+	private final long totalNumberOfPages;
 
-	private final KeyedBudgetManager<MemoryType> budgetByType;
+	private final AtomicLong availableMemorySize;
 
 	private final SharedResources sharedResources;
 
@@ -93,31 +89,30 @@ public class MemoryManager {
 	private volatile boolean isShutDown;
 
 	/**
-	 * Creates a memory manager with the given memory types, capacity and given page size.
+	 * Creates a memory manager with the given capacity and given page size.
 	 *
-	 * @param memorySizeByType The total size of the memory to be managed by this memory manager for each type (heap / off-heap).
+	 * @param memorySize The total size of the off-heap memory to be managed by this memory manager.
 	 * @param pageSize The size of the pages handed out by the memory manager.
 	 */
-	public MemoryManager(Map<MemoryType, Long> memorySizeByType, int pageSize) {
-		for (Entry<MemoryType, Long> sizeForType : memorySizeByType.entrySet()) {
-			sanityCheck(sizeForType.getValue(), pageSize, sizeForType.getKey());
-		}
+	public MemoryManager(long memorySize, int pageSize) {
+		sanityCheck(memorySize, pageSize);
 
+		this.pageSize = pageSize;
+		this.totalMemorySize = memorySize;
+		this.totalNumberOfPages = memorySize / pageSize;
 		this.allocatedSegments = new ConcurrentHashMap<>();
 		this.reservedMemory = new ConcurrentHashMap<>();
-		this.budgetByType = new KeyedBudgetManager<>(memorySizeByType, pageSize);
+		this.availableMemorySize = new AtomicLong(totalMemorySize);
 		this.sharedResources = new SharedResources();
-		verifyIntTotalNumberOfPages(memorySizeByType, budgetByType.maxTotalNumberOfPages());
+		verifyIntTotalNumberOfPages(totalMemorySize, totalNumberOfPages);
 
 		LOG.debug(
-			"Initialized MemoryManager with total memory size {} ({}), page size {}.",
-			budgetByType.totalAvailableBudget(),
-			memorySizeByType,
+			"Initialized MemoryManager with total memory size {} and page size {}.",
+			memorySize,
 			pageSize);
 	}
 
-	private static void sanityCheck(long memorySize, int pageSize, MemoryType memoryType) {
-		Preconditions.checkNotNull(memoryType);
+	private static void sanityCheck(long memorySize, int pageSize) {
 		Preconditions.checkArgument(memorySize >= 0L, "Size of total memory must be non-negative.");
 		Preconditions.checkArgument(
 			pageSize >= MIN_PAGE_SIZE,
@@ -127,12 +122,13 @@ public class MemoryManager {
 			"The given page size is not a power of two.");
 	}
 
-	private static void verifyIntTotalNumberOfPages(Map<MemoryType, Long> memorySizeByType, long numberOfPagesLong) {
+	private static void verifyIntTotalNumberOfPages(long memorySize, long numberOfPagesLong) {
 		Preconditions.checkArgument(
 			numberOfPagesLong <= Integer.MAX_VALUE,
-			"The given number of memory bytes (%d: %s) corresponds to more than MAX_INT pages.",
+			"The given number of memory bytes (%d) corresponds to more than MAX_INT pages (%d > %d).",
+			memorySize,
 			numberOfPagesLong,
-			memorySizeByType);
+			Integer.MAX_VALUE);
 	}
 
 	// ------------------------------------------------------------------------
@@ -150,7 +146,7 @@ public class MemoryManager {
 			// mark as shutdown and release memory
 			isShutDown = true;
 			reservedMemory.clear();
-			budgetByType.releaseAll();
+			availableMemorySize.set(totalMemorySize);
 
 			// go over all allocated segments and release them
 			for (Set<MemorySegment> segments : allocatedSegments.values()) {
@@ -179,7 +175,7 @@ public class MemoryManager {
 	 * @return True, if the memory manager is empty and valid, false if it is not empty or corrupted.
 	 */
 	public boolean verifyEmpty() {
-		return budgetByType.totalAvailableBudget() == budgetByType.maxTotalBudget();
+		return availableMemorySize.get() == totalMemorySize;
 	}
 
 	// ------------------------------------------------------------------------
@@ -189,8 +185,7 @@ public class MemoryManager {
 	/**
 	 * Allocates a set of memory segments from this memory manager.
 	 *
-	 * <p>The returned segments can have any memory type. The total allocated memory for each type will not exceed its
-	 * size limit, announced in the constructor.
+	 * <p>The total allocated memory will not exceed its size limit, announced in the constructor.
 	 *
 	 * @param owner The owner to associate with the memory segment, for the fallback release.
 	 * @param numPages The number of pages to allocate.
@@ -209,8 +204,7 @@ public class MemoryManager {
 	/**
 	 * Allocates a set of memory segments from this memory manager.
 	 *
-	 * <p>The allocated segments can have any memory type. The total allocated memory for each type will not exceed its
-	 * size limit, announced in the constructor.
+	 * <p>The total allocated memory will not exceed its size limit, announced in the constructor.
 	 *
 	 * @param owner The owner to associate with the memory segment, for the fallback release.
 	 * @param target The list into which to put the allocated memory pages.
@@ -226,7 +220,6 @@ public class MemoryManager {
 			int numberOfPages) throws MemoryAllocationException {
 		allocatePages(AllocationRequest
 			.newBuilder(owner)
-			.ofAllTypes()
 			.numberOfPages(numberOfPages)
 			.withOutput(target)
 			.build());
@@ -251,30 +244,31 @@ public class MemoryManager {
 		// sanity check
 		Preconditions.checkNotNull(owner, "The memory owner must not be null.");
 		Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
+		Preconditions.checkArgument(
+			numberOfPages <= totalNumberOfPages,
+			"Cannot allocate more segments %d than the max number %d",
+			numberOfPages,
+			totalNumberOfPages);
 
 		// reserve array space, if applicable
 		if (target instanceof ArrayList) {
 			((ArrayList<MemorySegment>) target).ensureCapacity(numberOfPages);
 		}
 
-		AcquisitionResult<MemoryType> acquiredBudget = budgetByType.acquirePagedBudget(request.getTypes(), numberOfPages);
-		if (acquiredBudget.isFailure()) {
-			throw new MemoryAllocationException(
-				String.format(
-					"Could not allocate %d pages. Only %d pages are remaining.",
-					numberOfPages,
-					acquiredBudget.getTotalAvailableForAllQueriedKeys()));
+		long memoryToReserve = numberOfPages * pageSize;
+		try {
+			reserveMemory(memoryToReserve);
+		} catch (MemoryReservationException e) {
+			throw new MemoryAllocationException(String.format("Could not allocate %d pages", numberOfPages), e);
 		}
 
 		allocatedSegments.compute(owner, (o, currentSegmentsForOwner) -> {
 			Set<MemorySegment> segmentsForOwner = currentSegmentsForOwner == null ?
 				new HashSet<>(numberOfPages) : currentSegmentsForOwner;
-			for (MemoryType memoryType : acquiredBudget.getAcquiredPerKey().keySet()) {
-				for (long i = acquiredBudget.getAcquiredPerKey().get(memoryType); i > 0; i--) {
-					MemorySegment segment = allocateManagedSegment(memoryType, owner);
-					target.add(segment);
-					segmentsForOwner.add(segment);
-				}
+			for (long i = numberOfPages; i > 0; i--) {
+				MemorySegment segment = allocateOffHeapUnsafeMemory(getPageSize(), owner);
+				target.add(segment);
+				segmentsForOwner.add(segment);
 			}
 			return segmentsForOwner;
 		});
@@ -289,10 +283,9 @@ public class MemoryManager {
 	 *
 	 * <p>If the segment has already been released, it is only freed. If it is null or has no owner, the request is simply ignored.
 	 * The segment is only freed and made eligible for reclamation by the GC. The segment will be returned to
-	 * the memory pool of its type, increasing its available limit for the later allocations.
+	 * the memory pool, increasing its available limit for the later allocations.
 	 *
 	 * @param segment The segment to be released.
-	 * @throws IllegalArgumentException Thrown, if the given segment is of an incompatible type.
 	 */
 	public void release(MemorySegment segment) {
 		Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
@@ -307,9 +300,8 @@ public class MemoryManager {
 			allocatedSegments.computeIfPresent(segment.getOwner(), (o, segsForOwner) -> {
 				segment.free();
 				if (segsForOwner.remove(segment)) {
-					budgetByType.releasePageForKey(getSegmentType(segment));
+					releaseMemory(getPageSize());
 				}
-				//noinspection ReturnOfNull
 				return segsForOwner.isEmpty() ? null : segsForOwner;
 			});
 		}
@@ -322,10 +314,9 @@ public class MemoryManager {
 	 * Tries to release many memory segments together.
 	 *
 	 * <p>The segment is only freed and made eligible for reclamation by the GC. Each segment will be returned to
-	 * the memory pool of its type, increasing its available limit for the later allocations.
+	 * the memory pool, increasing its available limit for the later allocations.
 	 *
 	 * @param segments The segments to be released.
-	 * @throws IllegalArgumentException Thrown, if the segments are of an incompatible type.
 	 */
 	public void release(Collection<MemorySegment> segments) {
 		if (segments == null) {
@@ -334,7 +325,7 @@ public class MemoryManager {
 
 		Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
 
-		EnumMap<MemoryType, Long> releasedMemory = new EnumMap<>(MemoryType.class);
+		AtomicLong releasedMemory = new AtomicLong(0L);
 
 		// since concurrent modifications to the collection
 		// can disturb the release, we need to try potentially multiple times
@@ -365,17 +356,17 @@ public class MemoryManager {
 			}
 		} while (!successfullyReleased);
 
-		budgetByType.releaseBudgetForKeys(releasedMemory);
+		releaseMemory(releasedMemory.get());
 	}
 
 	private MemorySegment releaseSegmentsForOwnerUntilNextOwner(
 			MemorySegment firstSeg,
 			Iterator<MemorySegment> segmentsIterator,
-			EnumMap<MemoryType, Long> releasedMemory) {
+			AtomicLong releasedMemory) {
 		AtomicReference<MemorySegment> nextOwnerMemorySegment = new AtomicReference<>();
 		Object owner = firstSeg.getOwner();
 		allocatedSegments.compute(owner, (o, segsForOwner) -> {
-			freeSegment(firstSeg, segsForOwner, releasedMemory);
+			releasedMemory.addAndGet(freeSegment(firstSeg, segsForOwner));
 			while (segmentsIterator.hasNext()) {
 				MemorySegment segment = segmentsIterator.next();
 				try {
@@ -387,26 +378,20 @@ public class MemoryManager {
 						nextOwnerMemorySegment.set(segment);
 						break;
 					}
-					freeSegment(segment, segsForOwner, releasedMemory);
+					releasedMemory.addAndGet(freeSegment(segment, segsForOwner));
 				} catch (Throwable t) {
 					throw new RuntimeException(
 						"Error removing book-keeping reference to allocated memory segment.", t);
 				}
 			}
-			//noinspection ReturnOfNull
 			return segsForOwner == null || segsForOwner.isEmpty() ? null : segsForOwner;
 		});
 		return nextOwnerMemorySegment.get();
 	}
 
-	private void freeSegment(
-			MemorySegment segment,
-			@Nullable Collection<MemorySegment> segments,
-			EnumMap<MemoryType, Long> releasedMemory) {
+	private long freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) {
 		segment.free();
-		if (segments != null && segments.remove(segment)) {
-			releaseSegment(segment, releasedMemory);
-		}
+		return segments != null && segments.remove(segment) ? getPageSize() : 0L;
 	}
 
 	/**
@@ -430,125 +415,92 @@ public class MemoryManager {
 		}
 
 		// free each segment
-		EnumMap<MemoryType, Long> releasedMemory = new EnumMap<>(MemoryType.class);
+		long releasedMemory = 0L;
 		for (MemorySegment segment : segments) {
 			segment.free();
-			releaseSegment(segment, releasedMemory);
+			releasedMemory += getPageSize();
 		}
-		budgetByType.releaseBudgetForKeys(releasedMemory);
+		releaseMemory(releasedMemory);
 
 		segments.clear();
 	}
 
 	/**
-	 * Reserves memory of a certain type for an owner from this memory manager.
+	 * Reserves a memory chunk of a certain size for an owner from this memory manager.
 	 *
 	 * @param owner The owner to associate with the memory reservation, for the fallback release.
-	 * @param memoryType type of memory to reserve (heap / off-heap).
 	 * @param size size of memory to reserve.
 	 * @throws MemoryReservationException Thrown, if this memory manager does not have the requested amount
 	 *                                    of memory any more.
 	 */
-	public void reserveMemory(Object owner, MemoryType memoryType, long size) throws MemoryReservationException {
-		checkMemoryReservationPreconditions(owner, memoryType, size);
+	public void reserveMemory(Object owner, long size) throws MemoryReservationException {
+		checkMemoryReservationPreconditions(owner, size);
 		if (size == 0L) {
 			return;
 		}
 
-		long acquiredMemory = budgetByType.acquireBudgetForKey(memoryType, size);
-		if (acquiredMemory < size) {
-			throw new MemoryReservationException(
-				String.format("Could not allocate %d bytes. Only %d bytes are remaining.", size, acquiredMemory));
-		}
+		reserveMemory(size);
 
-		reservedMemory.compute(owner, (o, reservations) -> {
-			Map<MemoryType, Long> newReservations = reservations;
-			if (reservations == null) {
-				newReservations = new EnumMap<>(MemoryType.class);
-				newReservations.put(memoryType, size);
-			} else {
-				reservations.compute(
-					memoryType,
-					(mt, currentlyReserved) -> currentlyReserved == null ? size : currentlyReserved + size);
-			}
-			return newReservations;
-		});
+		reservedMemory.compute(owner, (o, memoryReservedForOwner) ->
+			memoryReservedForOwner == null ? size : memoryReservedForOwner + size);
 
 		Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");
 	}
 
 	/**
-	 * Releases memory of a certain type from an owner to this memory manager.
+	 * Releases a memory chunk of a certain size from an owner to this memory manager.
 	 *
 	 * @param owner The owner to associate with the memory reservation, for the fallback release.
-	 * @param memoryType type of memory to release (heap / off-heap).
 	 * @param size size of memory to release.
 	 */
-	public void releaseMemory(Object owner, MemoryType memoryType, long size) {
-		checkMemoryReservationPreconditions(owner, memoryType, size);
+	public void releaseMemory(Object owner, long size) {
+		checkMemoryReservationPreconditions(owner, size);
 		if (size == 0L) {
 			return;
 		}
 
-		reservedMemory.compute(owner, (o, reservations) -> {
-			if (reservations != null) {
-				reservations.compute(
-					memoryType,
-					(mt, currentlyReserved) -> {
-						long newReservedMemory = 0;
-						if (currentlyReserved != null) {
-							if (currentlyReserved < size) {
-								LOG.warn(
-									"Trying to release more memory {} than it was reserved {} so far for the owner {}",
-									size,
-									currentlyReserved,
-									owner);
-							}
-
-							newReservedMemory = releaseAndCalculateReservedMemory(size, memoryType, currentlyReserved);
-						}
-
-						return newReservedMemory == 0 ? null : newReservedMemory;
-					});
+		reservedMemory.compute(owner, (o, currentlyReserved) -> {
+			long newReservedMemory = 0;
+			if (currentlyReserved != null) {
+				if (currentlyReserved < size) {
+					LOG.warn(
+						"Trying to release more memory {} than it was reserved {} so far for the owner {}",
+						size,
+						currentlyReserved,
+						owner);
+				}
+
+				newReservedMemory = releaseAndCalculateReservedMemory(size, currentlyReserved);
 			}
-			//noinspection ReturnOfNull
-			return reservations == null || reservations.isEmpty() ? null : reservations;
+
+			return newReservedMemory == 0 ? null : newReservedMemory;
 		});
 	}
 
-	private long releaseAndCalculateReservedMemory(long memoryToFree, MemoryType memoryType, long currentlyReserved) {
+	private long releaseAndCalculateReservedMemory(long memoryToFree, long currentlyReserved) {
 		final long effectiveMemoryToRelease = Math.min(currentlyReserved, memoryToFree);
-		budgetByType.releaseBudgetForKey(memoryType, effectiveMemoryToRelease);
+		releaseMemory(effectiveMemoryToRelease);
 
 		return currentlyReserved - effectiveMemoryToRelease;
 	}
 
-	private void checkMemoryReservationPreconditions(Object owner, MemoryType memoryType, long size) {
+	private void checkMemoryReservationPreconditions(Object owner, long size) {
 		Preconditions.checkNotNull(owner, "The memory owner must not be null.");
-		Preconditions.checkNotNull(memoryType, "The memory type must not be null.");
 		Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
 		Preconditions.checkArgument(size >= 0L, "The memory size (%s) has to have non-negative size", size);
 	}
 
 	/**
-	 * Releases all memory of a certain type from an owner to this memory manager.
+	 * Releases all reserved memory chunks from an owner to this memory manager.
 	 *
 	 * @param owner The owner to associate with the memory reservation, for the fallback release.
-	 * @param memoryType type of memory to release (heap / off-heap).
 	 */
-	public void releaseAllMemory(Object owner, MemoryType memoryType) {
-		checkMemoryReservationPreconditions(owner, memoryType, 0L);
-
-		reservedMemory.compute(owner, (o, reservations) -> {
-			if (reservations != null) {
-				Long size = reservations.remove(memoryType);
-				if (size != null) {
-					budgetByType.releaseBudgetForKey(memoryType, size);
-				}
-			}
-			//noinspection ReturnOfNull
-			return reservations == null || reservations.isEmpty() ? null : reservations;
-		});
+	public void releaseAllMemory(Object owner) {
+		checkMemoryReservationPreconditions(owner, 0L);
+		Long memoryReservedForOwner = reservedMemory.remove(owner);
+		if (memoryReservedForOwner != null) {
+			releaseMemory(memoryReservedForOwner);
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -598,7 +550,7 @@ public class MemoryManager {
 		// and release should happen
 		final LongFunctionWithException<T, Exception> reserveAndInitialize = (size) -> {
 			try {
-				reserveMemory(type, MemoryType.OFF_HEAP, size);
+				reserveMemory(type, size);
 			} catch (MemoryReservationException e) {
 				throw new MemoryAllocationException("Could not created the shared memory resource of size " + size +
 					". Not enough memory left to reserve from the slot's managed memory.", e);
@@ -607,7 +559,7 @@ public class MemoryManager {
 			return initializer.apply(size);
 		};
 
-		final Consumer<Long> releaser = (size) -> releaseMemory(type, MemoryType.OFF_HEAP, size);
+		final Consumer<Long> releaser = (size) -> releaseMemory(type, size);
 
 		// This object identifies the lease in this request. It is used only to identify the release operation.
 		// Using the object to represent the lease is a bit nicer safer than just using a reference counter.
@@ -663,7 +615,7 @@ public class MemoryManager {
 	 */
 	public int getPageSize() {
 		//noinspection NumericCastThatLosesPrecision
-		return (int) budgetByType.getDefaultPageSize();
+		return (int) pageSize;
 	}
 
 	/**
@@ -672,27 +624,16 @@ public class MemoryManager {
 	 * @return The total size of memory.
 	 */
 	public long getMemorySize() {
-		return budgetByType.maxTotalBudget();
-	}
-
-	/**
-	 * Returns the total size of the certain type of memory handled by this memory manager.
-	 *
-	 * @param memoryType The type of memory.
-	 * @return The total size of memory.
-	 */
-	public long getMemorySizeByType(MemoryType memoryType) {
-		return budgetByType.maxTotalBudgetForKey(memoryType);
+		return totalMemorySize;
 	}
 
 	/**
 	 * Returns the total size of the certain type of memory handled by this memory manager.
 	 *
-	 * @param memoryType The type of memory.
-	 * @return The total size of memory.
+	 * @return The available amount of memory.
 	 */
-	public long availableMemory(MemoryType memoryType) {
-		return budgetByType.availableBudgetForKey(memoryType);
+	public long availableMemory() {
+		return availableMemorySize.get();
 	}
 
 	/**
@@ -709,7 +650,7 @@ public class MemoryManager {
 		}
 
 		//noinspection NumericCastThatLosesPrecision
-		return (int) (budgetByType.maxTotalNumberOfPages() * fraction);
+		return (int) (totalNumberOfPages * fraction);
 	}
 
 	/**
@@ -723,26 +664,45 @@ public class MemoryManager {
 			fraction > 0 && fraction <= 1,
 			"The fraction of memory to allocate must within (0, 1], was: %s", fraction);
 
-		return (long) (budgetByType.maxTotalBudget() * fraction);
+		//noinspection NumericCastThatLosesPrecision
+		return (long) Math.floor(totalMemorySize * fraction);
 	}
 
-	private MemorySegment allocateManagedSegment(MemoryType memoryType, Object owner) {
-		switch (memoryType) {
-			case HEAP:
-				return allocateUnpooledSegment(getPageSize(), owner);
-			case OFF_HEAP:
-				return allocateOffHeapUnsafeMemory(getPageSize(), owner);
-			default:
-				throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
+	private void reserveMemory(long size) throws MemoryReservationException {
+		long availableOrReserved = tryReserveMemory(size);
+		if (availableOrReserved < size) {
+			throw new MemoryReservationException(
+				String.format("Could not allocate %d bytes, only %d bytes are remaining", size, availableOrReserved));
 		}
 	}
 
-	private void releaseSegment(MemorySegment segment, EnumMap<MemoryType, Long> releasedMemory) {
-		releasedMemory.compute(getSegmentType(segment), (t, v) -> v == null ? getPageSize() : v + getPageSize());
+	private long tryReserveMemory(long size) {
+		long currentAvailableMemorySize;
+		while (size <= (currentAvailableMemorySize = availableMemorySize.get())) {
+			if (availableMemorySize.compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize - size)) {
+				return size;
+			}
+		}
+		return currentAvailableMemorySize;
 	}
 
-	private static MemoryType getSegmentType(MemorySegment segment) {
-		return segment.isOffHeap() ? MemoryType.OFF_HEAP : MemoryType.HEAP;
+	private void releaseMemory(@Nonnegative long size) {
+		if (size == 0) {
+			return;
+		}
+		boolean released = false;
+		long currentAvailableMemorySize = 0L;
+		while (!released && totalMemorySize >= (currentAvailableMemorySize = availableMemorySize.get()) + size) {
+			released = availableMemorySize
+				.compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize + size);
+		}
+		if (!released) {
+			throw new IllegalStateException(String.format(
+				"Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes",
+				size,
+				currentAvailableMemorySize,
+				totalMemorySize));
+		}
 	}
 
 	/** Memory segment allocation request. */
@@ -757,18 +717,13 @@ public class MemoryManager {
 		/** Number of pages to allocate. */
 		private final int numberOfPages;
 
-		/** Allowed types of memory to allocate. */
-		private final Set<MemoryType> types;
-
 		private AllocationRequest(
 				Object owner,
 				Collection<MemorySegment> output,
-				int numberOfPages,
-				Set<MemoryType> types) {
+				int numberOfPages) {
 			this.owner = owner;
 			this.output = output;
 			this.numberOfPages = numberOfPages;
-			this.types = types;
 		}
 
 		public Object getOwner() {
@@ -779,20 +734,12 @@ public class MemoryManager {
 			return numberOfPages;
 		}
 
-		public Set<MemoryType> getTypes() {
-			return Collections.unmodifiableSet(types);
-		}
-
 		public static Builder newBuilder(Object owner) {
 			return new Builder(owner);
 		}
 
-		public static AllocationRequest ofAllTypes(Object owner, int numberOfPages) {
-			return newBuilder(owner).ofAllTypes().numberOfPages(numberOfPages).build();
-		}
-
-		public static AllocationRequest ofType(Object owner, int numberOfPages, MemoryType type) {
-			return newBuilder(owner).ofType(type).numberOfPages(numberOfPages).build();
+		public static AllocationRequest forOf(Object owner, int numberOfPages) {
+			return newBuilder(owner).numberOfPages(numberOfPages).build();
 		}
 	}
 
@@ -802,7 +749,6 @@ public class MemoryManager {
 		private final Object owner;
 		private Collection<MemorySegment> output = new ArrayList<>();
 		private int numberOfPages = 1;
-		private Set<MemoryType> types = EnumSet.noneOf(MemoryType.class);
 
 		public Builder(Object owner) {
 			this.owner = owner;
@@ -819,18 +765,8 @@ public class MemoryManager {
 			return this;
 		}
 
-		public Builder ofType(MemoryType type) {
-			types.add(type);
-			return this;
-		}
-
-		public Builder ofAllTypes() {
-			types = EnumSet.allOf(MemoryType.class);
-			return this;
-		}
-
 		public AllocationRequest build() {
-			return new AllocationRequest(owner, output, numberOfPages, types);
+			return new AllocationRequest(owner, output, numberOfPages);
 		}
 	}
 
@@ -839,8 +775,6 @@ public class MemoryManager {
 	// ------------------------------------------------------------------------
 
 	public static MemoryManager forDefaultPageSize(long size) {
-		final Map<MemoryType, Long> memorySizes = new HashMap<>();
-		memorySizes.put(MemoryType.OFF_HEAP, size);
-		return new MemoryManager(memorySizes, DEFAULT_PAGE_SIZE);
+		return new MemoryManager(size, DEFAULT_PAGE_SIZE);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 779d6a9..1dc569f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.taskexecutor.slot;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -32,7 +31,6 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -316,8 +314,6 @@ public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseableAsync {
 	}
 
 	private static MemoryManager createMemoryManager(ResourceProfile resourceProfile, int pageSize) {
-		Map<MemoryType, Long> memorySizeByType =
-			Collections.singletonMap(MemoryType.OFF_HEAP, resourceProfile.getManagedMemory().getBytes());
-		return new MemoryManager(memorySizeByType, pageSize);
+		return new MemoryManager(resourceProfile.getManagedMemory().getBytes(), pageSize);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java
deleted file mode 100644
index f7d0855..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * Manages {@code long} available budget per key (allocation/release).
- *
- * <p>This manager gets a certain maximum {@code long} budget per key.
- * Users can acquire some budget for some key and release it later.
- * The manager keeps track of acquired/released budget and prevents from over-allocating.
- *
- * <p>There is also a paged type of allocation where a certain number of pages can be acquired from a set of keys.
- * The page has its budget size. The manager acquires randomly from all keys of a given set.
- * At the end, sum of pages acquired from each key is either requested number of pages or none.
- * Only integer number of pages are acquired from each key respecting its available budget (no page spans two or more keys)
- * or nothing is acquired reporting the maximum number of pages which could be acquired per each given key at the moment.
- *
- * @param <K> type of the budget key
- */
-@ThreadSafe
-public class KeyedBudgetManager<K> {
-	private final Map<K, Long> maxBudgetByKey;
-
-	private final long defaultPageSize;
-
-	private final long totalNumberOfPages;
-
-	@GuardedBy("lock")
-	private final Map<K, Long> availableBudgetByKey;
-
-	private final Object lock = new Object();
-
-	public KeyedBudgetManager(Map<K, Long> maxBudgetByKey, long defaultPageSize) {
-		Preconditions.checkNotNull(maxBudgetByKey);
-		Preconditions.checkArgument(defaultPageSize > 0L, "The default page size has to be greater than zero");
-
-		this.maxBudgetByKey = new HashMap<>(maxBudgetByKey);
-		this.availableBudgetByKey = new HashMap<>(maxBudgetByKey);
-		this.defaultPageSize = defaultPageSize;
-		this.totalNumberOfPages = calculateTotalNumberOfPages(maxBudgetByKey, defaultPageSize);
-	}
-
-	public long getDefaultPageSize() {
-		return defaultPageSize;
-	}
-
-	/**
-	 * Tries to acquire budget for a given key.
-	 *
-	 * <p>No budget is acquired if it was not possible to fully acquire the requested budget.
-	 *
-	 * @param key the key to acquire budget from
-	 * @param size the size of budget to acquire from the given key
-	 * @return the fully acquired budget for the key or max possible budget to acquire
-	 * if it was not possible to acquire the requested budget.
-	 */
-	public long acquireBudgetForKey(K key, long size) {
-		Preconditions.checkNotNull(key);
-		AcquisitionResult<K> result = acquirePagedBudgetForKeys(Collections.singletonList(key), size, 1L);
-		return result.isSuccess() ?
-			result.getAcquiredPerKey().get(key) : result.getTotalAvailableForAllQueriedKeys();
-	}
-
-	/**
-	 * Tries to acquire budget for given keys which equals to the number of pages times default page size.
-	 *
-	 * <p>See also {@link #acquirePagedBudgetForKeys(Iterable, long, long)}
-	 */
-	public AcquisitionResult<K> acquirePagedBudget(Iterable<K> keys, long numberOfPages) {
-		return acquirePagedBudgetForKeys(keys, numberOfPages, defaultPageSize);
-	}
-
-	/**
-	 * Tries to acquire budget which equals to the number of pages times page size.
-	 *
-	 * <p>The budget will be acquired only from the given keys. Only integer number of pages will be acquired from each key.
-	 * If the next page does not fit into the available budget of some key, it will try to be acquired from another key.
-	 * The acquisition is successful if the acquired number of pages for each key sums up to the requested number of pages.
-	 * The function does not make any preference about which keys from the given keys to acquire from.
-	 *
-	 * @param keys the keys to acquire budget from
-	 * @param numberOfPages the total number of pages to acquire from the given keys
-	 * @param pageSize the size of budget to acquire per page
-	 * @return the acquired number of pages for each key if the acquisition is successful or
-	 * the total number of pages which were available for the given keys.
-	 */
-	AcquisitionResult<K> acquirePagedBudgetForKeys(Iterable<K> keys, long numberOfPages, long pageSize) {
-		Preconditions.checkNotNull(keys);
-		Preconditions.checkArgument(numberOfPages >= 0L, "The requested number of pages has to be positive");
-		Preconditions.checkArgument(pageSize > 0L, "The page size has to be greater than zero");
-
-		synchronized (lock) {
-			long leftPagesToReserve = numberOfPages;
-			Map<K, Long> pagesToReserveByKey = new HashMap<>();
-			for (K key : keys) {
-				long availableBudgetOfCurrentKey = availableBudgetByKey.getOrDefault(key, 0L);
-				long availablePagesOfCurrentKey = availableBudgetOfCurrentKey / pageSize;
-				if (leftPagesToReserve <= availablePagesOfCurrentKey) {
-					pagesToReserveByKey.put(key, leftPagesToReserve);
-					leftPagesToReserve = 0L;
-					break;
-				} else if (availablePagesOfCurrentKey > 0L) {
-					pagesToReserveByKey.put(key, availablePagesOfCurrentKey);
-					leftPagesToReserve -= availablePagesOfCurrentKey;
-				}
-			}
-			boolean possibleToAcquire = leftPagesToReserve == 0L;
-			if (possibleToAcquire) {
-				for (Entry<K, Long> pagesToReserveForKey : pagesToReserveByKey.entrySet()) {
-					//noinspection ConstantConditions
-					availableBudgetByKey.compute(
-						pagesToReserveForKey.getKey(),
-						(k, v) -> v - (pagesToReserveForKey.getValue() * pageSize));
-				}
-			}
-			return possibleToAcquire ?
-				AcquisitionResult.success(pagesToReserveByKey) : AcquisitionResult.failure(numberOfPages - leftPagesToReserve);
-		}
-	}
-
-	public void releasePageForKey(K key) {
-		releaseBudgetForKey(key, defaultPageSize);
-	}
-
-	public void releaseBudgetForKey(K key, long size) {
-		Preconditions.checkNotNull(key);
-		Preconditions.checkArgument(size >= 0L, "The budget to release has to be positive");
-
-		releaseBudgetForKeys(Collections.singletonMap(key, size));
-	}
-
-	public void releaseBudgetForKeys(Map<K, Long> sizeByKey) {
-		Preconditions.checkNotNull(sizeByKey);
-
-		synchronized (lock) {
-			for (Entry<K, Long> toReleaseForKey : sizeByKey.entrySet()) {
-				long toRelease = toReleaseForKey.getValue();
-				Preconditions.checkArgument(
-					toRelease >= 0L,
-					"The budget to release for key %s has to be positive",
-					toReleaseForKey.getKey());
-				if (toRelease == 0L) {
-					continue;
-				}
-				K keyToReleaseFor = toReleaseForKey.getKey();
-				long maxBudgetForKey = maxBudgetByKey.get(keyToReleaseFor);
-				availableBudgetByKey.compute(keyToReleaseFor, (k, currentBudget) -> {
-					if (currentBudget == null) {
-						throw new IllegalArgumentException("The budget key is not supported: " + keyToReleaseFor);
-					} else if (currentBudget + toRelease > maxBudgetForKey) {
-						throw new IllegalStateException(
-							String.format(
-								"The budget to release %d exceeds the limit %d for key %s",
-								toRelease,
-								maxBudgetForKey,
-								keyToReleaseFor));
-					} else {
-						return currentBudget + toRelease;
-					}
-				});
-			}
-		}
-	}
-
-	public void releaseAll() {
-		synchronized (lock) {
-			availableBudgetByKey.putAll(maxBudgetByKey);
-		}
-	}
-
-	public long maxTotalBudget() {
-		return maxBudgetByKey.values().stream().mapToLong(b -> b).sum();
-	}
-
-	public long maxTotalNumberOfPages() {
-		return totalNumberOfPages;
-	}
-
-	public long maxTotalBudgetForKey(K key) {
-		Preconditions.checkNotNull(key);
-		return maxBudgetByKey.get(key);
-	}
-
-	public long totalAvailableBudget() {
-		return availableBudgetForKeys(maxBudgetByKey.keySet());
-	}
-
-	long availableBudgetForKeys(Iterable<K> keys) {
-		Preconditions.checkNotNull(keys);
-		synchronized (lock) {
-			long totalSize = 0L;
-			for (K key : keys) {
-				totalSize += availableBudgetForKey(key);
-			}
-			return totalSize;
-		}
-	}
-
-	public long availableBudgetForKey(K key) {
-		Preconditions.checkNotNull(key);
-		synchronized (lock) {
-			return availableBudgetByKey.getOrDefault(key, 0L);
-		}
-	}
-
-	private static <K> long calculateTotalNumberOfPages(Map<K, Long> budgetByType, long pageSize) {
-		long numPages = 0L;
-		for (long sizeForType : budgetByType.values()) {
-			numPages += sizeForType / pageSize;
-		}
-		return numPages;
-	}
-
-	/**
-	 * Result of budget acquisition to return from acquisition functions.
-	 *
-	 * <p>The result of acquisition is either success: {@link AcquisitionResult#isSuccess()} and this class contains
-	 * acquired budget/pages per key: {@link AcquisitionResult#getAcquiredPerKey()} or
-	 * it is failure: {@link AcquisitionResult#isFailure()} and this class contains total max available budget for all
-	 * queried keys: {@link AcquisitionResult#getTotalAvailableForAllQueriedKeys()} which was not enough to
-	 * acquire the requested number of pages.
-	 */
-	public static class AcquisitionResult<K> {
-		@Nullable
-		private final Map<K, Long> acquiredBudgetPerKey;
-
-		@Nullable
-		private final Long totalAvailableBudgetForAllQueriedKeys;
-
-		private AcquisitionResult(
-				@Nullable Map<K, Long> acquiredBudgetPerKey,
-				@Nullable Long totalAvailableBudgetForAllQueriedKeys) {
-			this.acquiredBudgetPerKey = acquiredBudgetPerKey;
-			this.totalAvailableBudgetForAllQueriedKeys = totalAvailableBudgetForAllQueriedKeys;
-		}
-
-		public static <K> AcquisitionResult<K> success(Map<K, Long> acquiredBudgetPerKey) {
-			return new AcquisitionResult<>(acquiredBudgetPerKey, null);
-		}
-
-		public static <K> AcquisitionResult<K> failure(long totalAvailableBudgetForAllQueriedKeys) {
-			return new AcquisitionResult<>(null, totalAvailableBudgetForAllQueriedKeys);
-		}
-
-		public boolean isSuccess() {
-			return acquiredBudgetPerKey != null;
-		}
-
-		public boolean isFailure() {
-			return totalAvailableBudgetForAllQueriedKeys != null;
-		}
-
-		public Map<K, Long> getAcquiredPerKey() {
-			if (acquiredBudgetPerKey == null) {
-				throw new IllegalStateException("The acquisition failed. Nothing was acquired.");
-			}
-			return Collections.unmodifiableMap(acquiredBudgetPerKey);
-		}
-
-		public long getTotalAvailableForAllQueriedKeys() {
-			if (totalAvailableBudgetForAllQueriedKeys == null) {
-				throw new IllegalStateException("The acquisition succeeded. All requested pages were acquired.");
-			}
-			return totalAvailableBudgetForAllQueriedKeys;
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
index 889f9cd..91599ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
@@ -18,29 +18,19 @@
 
 package org.apache.flink.runtime.memory;
 
-import org.apache.flink.core.memory.MemoryType;
-
-import java.util.EnumMap;
-import java.util.Map;
-
 import static org.apache.flink.runtime.memory.MemoryManager.DEFAULT_PAGE_SIZE;
 
 /** Builder class for {@link MemoryManager}. */
 public class MemoryManagerBuilder {
 	private static final long DEFAULT_MEMORY_SIZE = 32L * DEFAULT_PAGE_SIZE;
 
-	private final Map<MemoryType, Long> memoryPools = new EnumMap<>(MemoryType.class);
+	private long memorySize = DEFAULT_MEMORY_SIZE;
 	private int pageSize = DEFAULT_PAGE_SIZE;
 
 	private MemoryManagerBuilder() {}
 
 	public MemoryManagerBuilder setMemorySize(long memorySize) {
-		this.memoryPools.put(MemoryType.HEAP, memorySize);
-		return this;
-	}
-
-	public MemoryManagerBuilder setMemorySize(MemoryType memoryType, long memorySize) {
-		this.memoryPools.put(memoryType, memorySize);
+		this.memorySize = memorySize;
 		return this;
 	}
 
@@ -50,10 +40,7 @@ public class MemoryManagerBuilder {
 	}
 
 	public MemoryManager build() {
-		if (memoryPools.isEmpty()) {
-			memoryPools.put(MemoryType.HEAP, DEFAULT_MEMORY_SIZE);
-		}
-		return new MemoryManager(memoryPools, pageSize);
+		return new MemoryManager(memorySize, pageSize);
 	}
 
 	public static MemoryManagerBuilder newBuilder() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
index 9f49fe2..d501684 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.memory;
 
-import org.apache.flink.core.memory.MemoryType;
-
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -77,18 +75,18 @@ public class MemoryManagerSharedResourcesTest {
 
 		memoryManager.getSharedMemoryResourceForManagedMemory("type", TestResource::new, 0.5);
 
-		assertEquals(memoryManager.getMemorySize() / 2, memoryManager.availableMemory(MemoryType.OFF_HEAP));
+		assertEquals(memoryManager.getMemorySize() / 2, memoryManager.availableMemory());
 	}
 
 	@Test
 	public void getExistingDoesNotAllocateAdditionalMemory() throws Exception {
 		final MemoryManager memoryManager = createMemoryManager();
 		memoryManager.getSharedMemoryResourceForManagedMemory("type", TestResource::new, 0.8);
-		final long freeMemory = memoryManager.availableMemory(MemoryType.OFF_HEAP);
+		final long freeMemory = memoryManager.availableMemory();
 
 		memoryManager.getSharedMemoryResourceForManagedMemory("type", TestResource::new, 0.8);
 
-		assertEquals(freeMemory, memoryManager.availableMemory(MemoryType.OFF_HEAP));
+		assertEquals(freeMemory, memoryManager.availableMemory());
 	}
 
 	@Test
@@ -222,7 +220,7 @@ public class MemoryManagerSharedResourcesTest {
 
 		// this is to guard test assumptions
 		assertEquals(size, mm.getMemorySize());
-		assertEquals(size, mm.availableMemory(MemoryType.OFF_HEAP));
+		assertEquals(size, mm.availableMemory());
 
 		return mm;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index 75a3151..6a553df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.memory;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager.AllocationRequest;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
@@ -31,17 +30,11 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.EnumMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
-import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.ofAllTypes;
-import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.ofType;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
+import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.forOf;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
@@ -65,8 +58,7 @@ public class MemoryManagerTest {
 	public void setUp() {
 		this.memoryManager = MemoryManagerBuilder
 			.newBuilder()
-			.setMemorySize(MemoryType.HEAP, MEMORY_SIZE / 2)
-			.setMemorySize(MemoryType.OFF_HEAP, MEMORY_SIZE / 2)
+			.setMemorySize(MEMORY_SIZE)
 			.setPageSize(PAGE_SIZE)
 			.build();
 		this.random = new Random(RANDOM_SEED);
@@ -173,7 +165,7 @@ public class MemoryManagerTest {
 
 			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
 
-			testCannotAllocateAnymore(ofAllTypes(mockInvoke, 1));
+			testCannotAllocateAnymore(forOf(mockInvoke, 1));
 
 			Assert.assertTrue("The previously allocated segments were not valid any more.",
 																	allMemorySegmentsValid(segs));
@@ -190,13 +182,13 @@ public class MemoryManagerTest {
 	public void doubleReleaseReturnsMemoryOnlyOnce() throws MemoryAllocationException {
 		final AbstractInvokable mockInvoke = new DummyInvokable();
 
-		Collection<MemorySegment> segs = this.memoryManager.allocatePages(ofAllTypes(mockInvoke, NUM_PAGES));
+		Collection<MemorySegment> segs = this.memoryManager.allocatePages(forOf(mockInvoke, NUM_PAGES));
 		MemorySegment segment = segs.iterator().next();
 
 		this.memoryManager.release(segment);
 		this.memoryManager.release(segment);
 
-		testCannotAllocateAnymore(ofAllTypes(mockInvoke, 2));
+		testCannotAllocateAnymore(forOf(mockInvoke, 2));
 
 		this.memoryManager.releaseAll(mockInvoke);
 	}
@@ -220,116 +212,86 @@ public class MemoryManagerTest {
 	}
 
 	@Test
-	@SuppressWarnings("NumericCastThatLosesPrecision")
-	public void testAllocateMixedMemoryType() throws MemoryAllocationException {
-		int totalHeapPages = (int) memoryManager.getMemorySizeByType(MemoryType.HEAP) / PAGE_SIZE;
-		int totalOffHeapPages = (int) memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP) / PAGE_SIZE;
-		int pagesToAllocate =  totalHeapPages + totalOffHeapPages / 2;
-
+	public void testMemoryReservation() throws MemoryReservationException {
 		Object owner = new Object();
-		Collection<MemorySegment> segments = memoryManager.allocatePages(ofAllTypes(owner, pagesToAllocate));
-		Map<MemoryType, Integer> split = calcMemoryTypeSplitForSegments(segments);
-
-		assertThat(split.get(MemoryType.HEAP), lessThanOrEqualTo(totalHeapPages));
-		assertThat(split.get(MemoryType.OFF_HEAP), lessThanOrEqualTo(totalOffHeapPages));
-		assertThat(split.get(MemoryType.HEAP) + split.get(MemoryType.OFF_HEAP), is(pagesToAllocate));
 
-		memoryManager.release(segments);
-	}
-
-	private static Map<MemoryType, Integer> calcMemoryTypeSplitForSegments(Iterable<MemorySegment> segments) {
-		int heapPages = 0;
-		int offHeapPages = 0;
-		for (MemorySegment memorySegment : segments) {
-			if (memorySegment.isOffHeap()) {
-				offHeapPages++;
-			} else {
-				heapPages++;
-			}
-		}
-		Map<MemoryType, Integer> split = new EnumMap<>(MemoryType.class);
-		split.put(MemoryType.HEAP, heapPages);
-		split.put(MemoryType.OFF_HEAP, offHeapPages);
-		return split;
+		memoryManager.reserveMemory(owner, PAGE_SIZE);
+		memoryManager.releaseMemory(owner, PAGE_SIZE);
 	}
 
 	@Test
-	public void testMemoryReservation() throws MemoryReservationException {
+	public void testAllMemoryReservation() throws MemoryReservationException {
 		Object owner = new Object();
 
-		memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-		memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP));
-
-		memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-		memoryManager.releaseAllMemory(owner, MemoryType.OFF_HEAP);
+		memoryManager.reserveMemory(owner, memoryManager.getMemorySize());
+		memoryManager.releaseAllMemory(owner);
 	}
 
 	@Test
 	public void testCannotReserveBeyondTheLimit() throws MemoryReservationException {
 		Object owner = new Object();
-		memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP));
-		testCannotReserveAnymore(MemoryType.OFF_HEAP, 1L);
-		memoryManager.releaseAllMemory(owner, MemoryType.OFF_HEAP);
+		memoryManager.reserveMemory(owner, memoryManager.getMemorySize());
+		testCannotReserveAnymore(1L);
+		memoryManager.releaseAllMemory(owner);
 	}
 
 	@Test
 	public void testMemoryTooBigReservation() {
-		long size = memoryManager.getMemorySizeByType(MemoryType.HEAP) + PAGE_SIZE;
-		testCannotReserveAnymore(MemoryType.HEAP, size);
+		long size = memoryManager.getMemorySize() + PAGE_SIZE;
+		testCannotReserveAnymore(size);
 	}
 
 	@Test
 	public void testMemoryReleaseMultipleTimes() throws MemoryReservationException {
 		Object owner = new Object();
 		Object owner2 = new Object();
-		long totalHeapMemorySize = memoryManager.availableMemory(MemoryType.HEAP);
+		long totalHeapMemorySize = memoryManager.availableMemory();
 		// to prevent memory size exceeding the limit, reserve some memory from another owner.
-		memoryManager.reserveMemory(owner2, MemoryType.HEAP, PAGE_SIZE);
+		memoryManager.reserveMemory(owner2, PAGE_SIZE);
 
 		// reserve once but release twice
-		memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-		memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-		memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-		long heapMemoryLeft = memoryManager.availableMemory(MemoryType.HEAP);
+		memoryManager.reserveMemory(owner, PAGE_SIZE);
+		memoryManager.releaseMemory(owner, PAGE_SIZE);
+		memoryManager.releaseMemory(owner, PAGE_SIZE);
+		long heapMemoryLeft = memoryManager.availableMemory();
 		assertEquals("Memory leak happens", totalHeapMemorySize - PAGE_SIZE, heapMemoryLeft);
-		memoryManager.releaseAllMemory(owner2, MemoryType.HEAP);
+		memoryManager.releaseAllMemory(owner2);
 	}
 
 	@Test
 	public void testMemoryReleaseMoreThanReserved() throws MemoryReservationException {
 		Object owner = new Object();
 		Object owner2 = new Object();
-		long totalHeapMemorySize = memoryManager.availableMemory(MemoryType.HEAP);
+		long totalHeapMemorySize = memoryManager.availableMemory();
 		// to prevent memory size exceeding the limit, reserve some memory from another owner.
-		memoryManager.reserveMemory(owner2, MemoryType.HEAP, PAGE_SIZE);
+		memoryManager.reserveMemory(owner2, PAGE_SIZE);
 
 		// release more than reserved size
-		memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-		memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE * 2);
-		long heapMemoryLeft = memoryManager.availableMemory(MemoryType.HEAP);
+		memoryManager.reserveMemory(owner, PAGE_SIZE);
+		memoryManager.releaseMemory(owner, PAGE_SIZE * 2);
+		long heapMemoryLeft = memoryManager.availableMemory();
 		assertEquals("Memory leak happens", totalHeapMemorySize - PAGE_SIZE, heapMemoryLeft);
-		memoryManager.releaseAllMemory(owner2, MemoryType.HEAP);
+		memoryManager.releaseAllMemory(owner2);
 	}
 
 	@Test
 	public void testMemoryAllocationAndReservation() throws MemoryAllocationException, MemoryReservationException {
-		MemoryType type = MemoryType.OFF_HEAP;
 		@SuppressWarnings("NumericCastThatLosesPrecision")
-		int totalPagesForType = (int) memoryManager.getMemorySizeByType(type) / PAGE_SIZE;
+		int totalPagesForType = (int) memoryManager.getMemorySize() / PAGE_SIZE;
 
 		// allocate half memory for segments
 		Object owner1 = new Object();
-		memoryManager.allocatePages(ofType(owner1, totalPagesForType / 2, MemoryType.OFF_HEAP));
+		memoryManager.allocatePages(forOf(owner1, totalPagesForType / 2));
 
 		// reserve the other half of memory
 		Object owner2 = new Object();
-		memoryManager.reserveMemory(owner2, type, (long) PAGE_SIZE * totalPagesForType / 2);
+		memoryManager.reserveMemory(owner2, (long) PAGE_SIZE * totalPagesForType / 2);
 
-		testCannotAllocateAnymore(ofType(new Object(), 1, type));
-		testCannotReserveAnymore(type, 1L);
+		testCannotAllocateAnymore(forOf(new Object(), 1));
+		testCannotReserveAnymore(1L);
 
 		memoryManager.releaseAll(owner1);
-		memoryManager.releaseAllMemory(owner2, type);
+		memoryManager.releaseAllMemory(owner2);
 	}
 
 	@Test
@@ -366,9 +328,9 @@ public class MemoryManagerTest {
 		}
 	}
 
-	private void testCannotReserveAnymore(MemoryType type, long size) {
+	private void testCannotReserveAnymore(long size) {
 		try {
-			memoryManager.reserveMemory(new Object(), type, size);
+			memoryManager.reserveMemory(new Object(), size);
 			Assert.fail("Expected MemoryAllocationException. " +
 				"We should not be able to any more memory after allocating or(and) reserving all memory of a certain type.");
 		} catch (MemoryReservationException maex) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 52b00b8..01306f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -148,7 +147,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 		this.inputs = new LinkedList<InputGate>();
 		this.outputs = new LinkedList<ResultPartitionWriter>();
 
-		this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, offHeapMemorySize).build();
+		this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(offHeapMemorySize).build();
 		this.ioManager = ioManager;
 		this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java
deleted file mode 100644
index 0d431bd..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.util.KeyedBudgetManager.AcquisitionResult;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-/**
- * Test suite for {@link KeyedBudgetManager}.
- */
-@SuppressWarnings("MagicNumber")
-public class KeyedBudgetManagerTest extends TestLogger {
-	private static final String[] TEST_KEYS = {"k1", "k2", "k3", "k4"};
-	private static final long[] TEST_BUDGETS = {15, 17, 22, 11};
-	private static final Executor NEW_THREAD_EXECUTOR = r -> new Thread(r).start();
-
-	private KeyedBudgetManager<String> keyedBudgetManager;
-
-	@Before
-	public void setup() {
-		keyedBudgetManager = createSimpleKeyedBudget();
-	}
-
-	@After
-	public void teardown() {
-		keyedBudgetManager.releaseAll();
-		checkNoKeyBudgetChange();
-	}
-
-	@Test
-	public void testSuccessfulAcquisitionForKey() {
-		long acquired = keyedBudgetManager.acquireBudgetForKey("k1", 10L);
-
-		assertThat(acquired, is(10L));
-		checkOneKeyBudgetChange("k1", 5L);
-	}
-
-	@Test
-	public void testFailedAcquisitionForKey() {
-		long maxPossibleBudgetToAcquire = keyedBudgetManager.acquireBudgetForKey("k1", 20L);
-
-		assertThat(maxPossibleBudgetToAcquire, is(15L));
-		checkNoKeyBudgetChange();
-	}
-
-	@Test
-	public void testSuccessfulReleaseForKey() {
-		keyedBudgetManager.acquireBudgetForKey("k1", 10L);
-		keyedBudgetManager.releaseBudgetForKey("k1", 5L);
-
-		checkOneKeyBudgetChange("k1", 10L);
-	}
-
-	@Test
-	public void testFailedReleaseForKey() {
-		keyedBudgetManager.acquireBudgetForKey("k1", 10L);
-		try {
-			keyedBudgetManager.releaseBudgetForKey("k1", 15L);
-			fail("IllegalStateException is expected to fail over-sized release");
-		} catch (IllegalStateException e) {
-			// expected
-		}
-
-		checkOneKeyBudgetChange("k1", 5L);
-	}
-
-	@Test
-	public void testSuccessfulAcquisitionForKeys() {
-		AcquisitionResult<String> acquired = acquireForMultipleKeys(5L);
-
-		assertThat(checkAcquisitionSuccess(acquired, 4L), is(true));
-
-		assertThat(keyedBudgetManager.availableBudgetForKey("k1"), is(15L));
-		assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(19L));
-		assertThat(keyedBudgetManager.totalAvailableBudget(), is(45L));
-	}
-
-	@Test
-	public void testConcurrentAcquisitionForKeys() throws ExecutionException, InterruptedException {
-		long pageSize = 5L;
-		CompletableFuture<AcquisitionResult<String>> allocation1 = acquireForMultipleKeysAsync(pageSize);
-		CompletableFuture<Long> availableBudgetForKeysFuture = getAvailableBudgetForKeysAsync();
-		CompletableFuture<AcquisitionResult<String>> allocation2 = acquireForMultipleKeysAsync(pageSize);
-		Arrays
-			.asList(allocation1, allocation2, availableBudgetForKeysFuture)
-			.forEach(KeyedBudgetManagerTest::waitForFutureSilently);
-
-		boolean firstSucceeded = checkFirstAcquisitionSucceeded(allocation1, allocation2);
-		boolean secondSucceeded = checkFirstAcquisitionSucceeded(allocation2, allocation1);
-		assertThat(firstSucceeded || secondSucceeded, is(true));
-
-		long availableBudgetForKeys = availableBudgetForKeysFuture.get();
-		assertThat(availableBudgetForKeys == 39L || availableBudgetForKeys == 19L, is(true));
-	}
-
-	@Test
-	public void testConcurrentReleaseForKeys() throws ExecutionException, InterruptedException {
-		long pageSize = 5L;
-		Map<String, Long> sizeByKey = acquireForMultipleKeys(pageSize)
-			.getAcquiredPerKey()
-			.entrySet()
-			.stream()
-			.collect(Collectors.toMap(Entry::getKey, e -> e.getValue() * pageSize));
-
-		CompletableFuture<Void> release1 = releaseKeysAsync(sizeByKey);
-		CompletableFuture<Long> availableBudgetForKeysFuture = getAvailableBudgetForKeysAsync();
-		CompletableFuture<Void> release2 = releaseKeysAsync(sizeByKey);
-		Arrays
-			.asList(release1, availableBudgetForKeysFuture, release2)
-			.forEach(KeyedBudgetManagerTest::waitForFutureSilently);
-
-		boolean firstSucceeded = !release1.isCompletedExceptionally() && release2.isCompletedExceptionally();
-		boolean secondSucceeded = !release2.isCompletedExceptionally() && release1.isCompletedExceptionally();
-		assertThat(firstSucceeded || secondSucceeded, is(true));
-
-		long availableBudgetForKeys = availableBudgetForKeysFuture.get();
-		assertThat(availableBudgetForKeys == 39L || availableBudgetForKeys == 19L, is(true));
-
-		checkNoKeyBudgetChange();
-	}
-
-	@Test
-	public void testFailedAcquisitionForKeys() {
-		AcquisitionResult<String> acquired =
-			keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 6, 6);
-
-		assertThat(acquired.isFailure(), is(true));
-		assertThat(acquired.getTotalAvailableForAllQueriedKeys(), is(5L));
-		checkNoKeyBudgetChange();
-	}
-
-	@Test
-	public void testSuccessfulReleaseForKeys() {
-		keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8);
-		keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new String[] {"k2", "k3"}, new long[] {7, 10}));
-
-		assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(24L));
-		assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", "k4")), is(26L));
-		assertThat(keyedBudgetManager.totalAvailableBudget(), is(50L));
-	}
-
-	@Test
-	public void testSuccessfulReleaseForKeysWithMixedRequests() {
-		keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8);
-		keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k1", "k4"), 6, 3);
-		keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new String[] {"k2", "k3"}, new long[] {7, 10}));
-
-		assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(24L));
-		assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", "k4")), is(8L));
-		assertThat(keyedBudgetManager.totalAvailableBudget(), is(32L));
-	}
-
-	private void checkNoKeyBudgetChange() {
-		checkKeysBudgetChange(Collections.emptyMap());
-	}
-
-	private void checkOneKeyBudgetChange(
-			@SuppressWarnings("SameParameterValue") String key,
-			long budget) {
-		checkKeysBudgetChange(Collections.singletonMap(key, budget));
-	}
-
-	private void checkKeysBudgetChange(
-			Map<String, Long> changedBudgetPerKey) {
-		long totalExpectedBudget = 0L;
-		for (int i = 0; i < TEST_KEYS.length; i++) {
-			long expectedBudget = changedBudgetPerKey.containsKey(TEST_KEYS[i]) ?
-				changedBudgetPerKey.get(TEST_KEYS[i]) : TEST_BUDGETS[i];
-			assertThat(keyedBudgetManager.availableBudgetForKey(TEST_KEYS[i]), is(expectedBudget));
-			totalExpectedBudget += expectedBudget;
-		}
-		assertThat(keyedBudgetManager.maxTotalBudget(), is(LongStream.of(TEST_BUDGETS).sum()));
-		assertThat(keyedBudgetManager.totalAvailableBudget(), is(totalExpectedBudget));
-	}
-
-	private CompletableFuture<AcquisitionResult<String>> acquireForMultipleKeysAsync(long pageSize) {
-		return CompletableFuture.supplyAsync(() -> acquireForMultipleKeys(pageSize), NEW_THREAD_EXECUTOR);
-	}
-
-	private CompletableFuture<Long> getAvailableBudgetForKeysAsync() {
-		return CompletableFuture.supplyAsync(() -> keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), NEW_THREAD_EXECUTOR);
-	}
-
-	private AcquisitionResult<String> acquireForMultipleKeys(long pageSize) {
-		return keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, pageSize);
-	}
-
-	private CompletableFuture<Void> releaseKeysAsync(Map<String, Long> sizeByKey) {
-		return CompletableFuture.runAsync(() -> keyedBudgetManager.releaseBudgetForKeys(sizeByKey), NEW_THREAD_EXECUTOR);
-	}
-
-	private static boolean checkFirstAcquisitionSucceeded(
-		Future<AcquisitionResult<String>> allocation1,
-		Future<AcquisitionResult<String>> allocation2) throws ExecutionException, InterruptedException {
-		return checkAcquisitionSuccess(allocation1.get(), 4L) && allocation2.get().isFailure();
-	}
-
-	private static boolean checkAcquisitionSuccess(
-		AcquisitionResult<String> acquired,
-		@SuppressWarnings("SameParameterValue") long numberOfPageToAcquire) {
-		return acquired.isSuccess() &&
-			acquired.getAcquiredPerKey().values().stream().mapToLong(b -> b).sum() == numberOfPageToAcquire;
-	}
-
-	private static KeyedBudgetManager<String> createSimpleKeyedBudget() {
-		return new KeyedBudgetManager<>(createdBudgetMap(TEST_KEYS, TEST_BUDGETS), 1L);
-	}
-
-	private static Map<String, Long> createdBudgetMap(String[] keys, long[] budgets) {
-		Preconditions.checkArgument(keys.length == budgets.length);
-		Map<String, Long> keydBudgets = new HashMap<>();
-		for (int i = 0; i < keys.length; i++) {
-			keydBudgets.put(keys[i], budgets[i]);
-		}
-		return keydBudgets;
-	}
-
-	private static void waitForFutureSilently(Future<?> future) {
-		try {
-			future.get();
-		} catch (InterruptedException | ExecutionException e) {
-			// silent
-		}
-	}
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 181f701..359f908 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -159,7 +158,7 @@ public class StreamMockEnvironment implements Environment {
 		this.taskConfiguration = taskConfig;
 		this.inputs = new LinkedList<InputGate>();
 		this.outputs = new LinkedList<ResultPartitionWriter>();
-		this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, offHeapMemorySize).build();
+		this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(offHeapMemorySize).build();
 		this.ioManager = new IOManagerAsync();
 		this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
 		this.aggregateManager = new TestGlobalAggregateManager();


[flink] 03/04: [FLINK-15758][MemManager] Release segment and its unsafe memory in GC Cleaner

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2a4520935c8b546ac05f664f947d143b06322000
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Tue Feb 4 17:34:20 2020 +0100

    [FLINK-15758][MemManager] Release segment and its unsafe memory in GC Cleaner
    
    After #9747, managed memory is allocated from UNSAFE, not as direct nio buffers as before 1.10.
    The releasing of segments released also underlying unsafe memory which is dangerous in general
    as there can be still references to java objects giving access to the released memory. If this reference
    ever leaks, the illegal memory access can result in memory corruption of other code parts w/o even segmentation fault.
    
    The solution can be similar to how Java handles direct memory limit:
    - underlying byte buffers of segments are registered to phantom reference queue with a Java GC cleaner which releases the unsafe memory
    - all allocations and reservations are managed by a memory limit and an atomic available memory
    - if available memory is not enough while reserving, the phantom reference queue processing is triggered to run hopefully discovered by GC cleaners
    - memory can be released directly or in a GC cleaner
    
    The GC is also sped up by nulling out byte buffer reference in `HybridMemorySegment#free` which is inaccessible anyways after freeing.
    Otherwise also a lot of tests, which hold accidental references to memory segments, have to be fixed to not hold them.
    The `MemoryManager#verifyEmpty` checks that everything can be GC'ed at the end of the tests and
    after slot closing in production to detect memory leaks if any other references are held, e.g. from `HybridMemorySegment#wrap`.
    
    This closes #11109.
---
 .../flink/core/memory/HybridMemorySegment.java     |  27 +-
 .../flink/core/memory/MemorySegmentFactory.java    |  17 +-
 .../org/apache/flink/core/memory/MemoryUtils.java  |   7 +-
 .../apache/flink/util/JavaGcCleanerWrapper.java    | 413 ++++++++++++++-------
 .../flink/core/memory/CrossSegmentTypeTest.java    |   2 +-
 .../flink/core/memory/EndiannessAccessChecks.java  |   2 +-
 .../HybridOffHeapUnsafeMemorySegmentTest.java      |   4 +-
 .../flink/core/memory/MemorySegmentChecksTest.java |   4 +-
 .../core/memory/MemorySegmentUndersizedTest.java   |   4 +-
 .../core/memory/OperationsOnFreedSegmentTest.java  |   2 +-
 .../flink/util/JavaGcCleanerWrapperTest.java       |   2 +-
 .../apache/flink/runtime/memory/MemoryManager.java | 100 ++---
 .../flink/runtime/memory/UnsafeMemoryBudget.java   | 183 +++++++++
 .../flink/runtime/memory/MemoryManagerTest.java    |  48 +++
 .../runtime/memory/UnsafeMemoryBudgetTest.java     |  85 +++++
 15 files changed, 665 insertions(+), 235 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
index 1693e9a..fb7a4ba 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
@@ -56,11 +56,7 @@ public final class HybridMemorySegment extends MemorySegment {
 	 * released.
 	 */
 	@Nullable
-	private final ByteBuffer offHeapBuffer;
-
-	/** The cleaner is called to free the underlying native memory. */
-	@Nullable
-	private final Runnable cleaner;
+	private ByteBuffer offHeapBuffer;
 
 	/**
 	  * Creates a new memory segment that represents the memory backing the given direct byte buffer.
@@ -71,13 +67,11 @@ public final class HybridMemorySegment extends MemorySegment {
 	  *
 	  * @param buffer The byte buffer whose memory is represented by this memory segment.
 	  * @param owner The owner references by this memory segment.
-	  * @param cleaner optional action to run upon freeing the segment.
 	  * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
 	  */
-	HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner, @Nullable Runnable cleaner) {
+	HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
 		super(getByteBufferAddress(buffer), buffer.capacity(), owner);
 		this.offHeapBuffer = buffer;
-		this.cleaner = cleaner;
 	}
 
 	/**
@@ -91,13 +85,18 @@ public final class HybridMemorySegment extends MemorySegment {
 	HybridMemorySegment(byte[] buffer, Object owner) {
 		super(buffer, owner);
 		this.offHeapBuffer = null;
-		this.cleaner = null;
 	}
 
 	// -------------------------------------------------------------------------
 	//  MemorySegment operations
 	// -------------------------------------------------------------------------
 
+	@Override
+	public void free() {
+		super.free();
+		offHeapBuffer = null; // to enable GC of unsafe memory
+	}
+
 	/**
 	 * Gets the buffer that owns the memory of this memory segment.
 	 *
@@ -106,6 +105,8 @@ public final class HybridMemorySegment extends MemorySegment {
 	public ByteBuffer getOffHeapBuffer() {
 		if (offHeapBuffer != null) {
 			return offHeapBuffer;
+		} else if (isFreed()) {
+			throw new IllegalStateException("segment has been freed");
 		} else {
 			throw new IllegalStateException("Memory segment does not represent off heap memory");
 		}
@@ -134,14 +135,6 @@ public final class HybridMemorySegment extends MemorySegment {
 		}
 	}
 
-	@Override
-	public void free() {
-		super.free();
-		if (cleaner != null) {
-			cleaner.run();
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  Random Access get() and put() methods
 	// ------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index 2751d9c..ee301a1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -37,6 +38,7 @@ import java.nio.ByteBuffer;
 @Internal
 public final class MemorySegmentFactory {
 	private static final Logger LOG = LoggerFactory.getLogger(MemorySegmentFactory.class);
+	private static final Runnable NO_OP = () -> {};
 
 	/**
 	 * Creates a new memory segment that targets the given heap memory region.
@@ -100,7 +102,12 @@ public final class MemorySegmentFactory {
 	 */
 	public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
 		ByteBuffer memory = allocateDirectMemory(size);
-		return new HybridMemorySegment(memory, owner, null);
+		return new HybridMemorySegment(memory, owner);
+	}
+
+	@VisibleForTesting
+	public static MemorySegment allocateOffHeapUnsafeMemory(int size) {
+		return allocateOffHeapUnsafeMemory(size, null, NO_OP);
 	}
 
 	private static ByteBuffer allocateDirectMemory(int size) {
@@ -131,12 +138,14 @@ public final class MemorySegmentFactory {
 	 *
 	 * @param size The size of the off-heap unsafe memory segment to allocate.
 	 * @param owner The owner to associate with the off-heap unsafe memory segment.
+	 * @param customCleanupAction A custom action to run upon calling GC cleaner.
 	 * @return A new memory segment, backed by off-heap unsafe memory.
 	 */
-	public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner) {
+	public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner, Runnable customCleanupAction) {
 		long address = MemoryUtils.allocateUnsafe(size);
 		ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
-		return new HybridMemorySegment(offHeapBuffer, owner, MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address));
+		MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, customCleanupAction);
+		return new HybridMemorySegment(offHeapBuffer, owner);
 	}
 
 	/**
@@ -150,7 +159,7 @@ public final class MemorySegmentFactory {
 	 * @return A new memory segment representing the given off-heap memory.
 	 */
 	public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
-		return new HybridMemorySegment(memory, null, null);
+		return new HybridMemorySegment(memory, null);
 	}
 
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
index 7f6508c..34cac43 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
@@ -107,8 +107,11 @@ public class MemoryUtils {
 	 * @param address address of the unsafe memory to release
 	 * @return action to run to release the unsafe memory manually
 	 */
-	static Runnable createMemoryGcCleaner(Object owner, long address) {
-		return JavaGcCleanerWrapper.create(owner, () -> releaseUnsafe(address));
+	static Runnable createMemoryGcCleaner(Object owner, long address, Runnable customCleanup) {
+		return JavaGcCleanerWrapper.createCleaner(owner, () -> {
+			releaseUnsafe(address);
+			customCleanup.run();
+		});
 	}
 
 	private static void releaseUnsafe(long address) {
diff --git a/flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java b/flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
index becd028..ae8edd3 100644
--- a/flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
@@ -21,10 +21,14 @@ package org.apache.flink.util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Supplier;
 
 /**
  * Java GC Cleaner wrapper.
@@ -43,109 +47,251 @@ public enum JavaGcCleanerWrapper {
 	private static final Logger LOG = LoggerFactory.getLogger(JavaGcCleanerWrapper.class);
 
 	private static final Collection<CleanerProvider> CLEANER_PROVIDERS =
-		Arrays.asList(LegacyCleanerProvider.INSTANCE, Java9CleanerProvider.INSTANCE);
-	private static final CleanerFactory CLEANER_FACTORY = findGcCleaner();
+		Arrays.asList(createLegacyCleanerProvider(), createJava9CleanerProvider());
+	private static final CleanerManager CLEANER_MANAGER = findGcCleanerManager();
+
+	private static CleanerProvider createLegacyCleanerProvider() {
+		String name = "Legacy (before Java 9) cleaner";
+		ReflectionUtils reflectionUtils = new ReflectionUtils(name + " provider");
+		String cleanerClassName = "sun.misc.Cleaner";
+
+		// Actual Legacy code under the hood:
+		//
+		// public static Runnable createCleaner(Object owner, Runnable cleanOperation) {
+		//     sun.misc.Cleaner jvmCleaner = sun.misc.Cleaner.create(owner, cleanOperation);
+		//     return () -> jvmCleaner.clean();
+		// }
+		//
+		// public static boolean tryRunPendingCleaners() throws InterruptedException {
+		//     sun.misc.JavaLangRefAccess javaLangRefAccess = sun.misc.SharedSecrets.getJavaLangRefAccess();
+		//	   return javaLangRefAccess.tryHandlePendingReference();
+		// }
+		//
+		return new CleanerProvider(
+			name,
+			new CleanerFactoryProvider(
+				name,
+				reflectionUtils,
+				cleanerClassName,
+				Optional::empty, // there is no Cleaner object, static method of its class will be called to create it
+				"create", // static method of Cleaner class to create it
+				cleanerClassName, // Cleaner is Cleanable in this case
+				"clean"),
+			new PendingCleanersRunnerProvider(
+				name,
+				reflectionUtils,
+				"sun.misc.SharedSecrets",
+				"sun.misc.JavaLangRefAccess",
+				"getJavaLangRefAccess",
+				"tryHandlePendingReference"));
+	}
 
-	private static CleanerFactory findGcCleaner() {
-		CleanerFactory foundCleanerFactory = null;
+	private static CleanerProvider createJava9CleanerProvider() {
+		String name = "New Java 9+ cleaner";
+		ReflectionUtils reflectionUtils = new ReflectionUtils(name + " provider");
+		String cleanerClassName = "java.lang.ref.Cleaner";
+
+		// Actual Java 9+ code under the hood:
+		//
+		// public static Runnable createCleaner(Object owner, Runnable cleanOperation) {
+		//     java.lang.ref.Cleaner jvmCleaner = java.lang.ref.Cleaner.create();
+		//     java.lang.ref.Cleaner.Cleanable cleanable = jvmCleaner.register(owner, cleanOperation);
+		//     return () -> cleanable.clean();
+		// }
+		//
+		// public static boolean tryRunPendingCleaners() throws InterruptedException {
+		//     jdk.internal.misc.JavaLangRefAccess javaLangRefAccess = jdk.internal.misc.SharedSecrets.getJavaLangRefAccess();
+		//	   return javaLangRefAccess.waitForReferenceProcessing();
+		// }
+		//
+		return new CleanerProvider(
+			name,
+			new CleanerFactoryProvider(
+				name,
+				reflectionUtils,
+				cleanerClassName,
+				() -> {
+					Class<?> cleanerClass = reflectionUtils.findClass(cleanerClassName);
+					Method cleanerCreateMethod = reflectionUtils.findMethod(cleanerClass, "create");
+					try {
+						return Optional.of(cleanerCreateMethod.invoke(null));
+					} catch (IllegalAccessException | InvocationTargetException e) {
+						throw new FlinkRuntimeException("Failed to create a Java 9 Cleaner", e);
+					}
+				},
+				"register",
+				"java.lang.ref.Cleaner$Cleanable",
+				"clean"),
+			new PendingCleanersRunnerProvider(
+				name,
+				reflectionUtils,
+				"jdk.internal.misc.SharedSecrets",
+				"jdk.internal.misc.JavaLangRefAccess",
+				"getJavaLangRefAccess",
+				"waitForReferenceProcessing"));
+	}
+
+	private static CleanerManager findGcCleanerManager() {
+		CleanerManager foundCleanerManager = null;
 		Throwable t = null;
 		for (CleanerProvider cleanerProvider : CLEANER_PROVIDERS) {
-			//noinspection OverlyBroadCatchBlock
 			try {
-				foundCleanerFactory = cleanerProvider.createCleanerFactory();
+				foundCleanerManager = cleanerProvider.createCleanerManager();
 				break;
 			} catch (Throwable e) {
 				t = ExceptionUtils.firstOrSuppressed(e, t);
 			}
 		}
 
-		if (foundCleanerFactory == null) {
+		if (foundCleanerManager == null) {
 			String errorMessage = String.format("Failed to find GC Cleaner among available providers: %s", CLEANER_PROVIDERS);
 			throw new Error(errorMessage, t);
 		}
-		return foundCleanerFactory;
-	}
-
-	public static Runnable create(Object owner, Runnable cleanOperation) {
-		return CLEANER_FACTORY.create(owner, cleanOperation);
+		return foundCleanerManager;
 	}
 
-	@FunctionalInterface
-	private interface CleanerProvider {
-		CleanerFactory createCleanerFactory() throws ClassNotFoundException;
+	public static Runnable createCleaner(Object owner, Runnable cleanOperation) {
+		return CLEANER_MANAGER.create(owner, cleanOperation);
 	}
 
-	@FunctionalInterface
-	private interface CleanerFactory {
-		Runnable create(Object owner, Runnable cleanOperation);
+	public static boolean tryRunPendingCleaners() throws InterruptedException {
+		return CLEANER_MANAGER.tryRunPendingCleaners();
 	}
 
-	private enum LegacyCleanerProvider implements CleanerProvider {
-		INSTANCE;
+	private static class CleanerProvider {
+		private final String cleanerName;
+		private final CleanerFactoryProvider cleanerFactoryProvider;
+		private final PendingCleanersRunnerProvider pendingCleanersRunnerProvider;
+
+		private CleanerProvider(
+				String cleanerName,
+				CleanerFactoryProvider cleanerFactoryProvider,
+				PendingCleanersRunnerProvider pendingCleanersRunnerProvider) {
+			this.cleanerName = cleanerName;
+			this.cleanerFactoryProvider = cleanerFactoryProvider;
+			this.pendingCleanersRunnerProvider = pendingCleanersRunnerProvider;
+		}
 
-		private static final String LEGACY_CLEANER_CLASS_NAME = "sun.misc.Cleaner";
+		private CleanerManager createCleanerManager() {
+			return new CleanerManager(
+				cleanerName,
+				cleanerFactoryProvider.createCleanerFactory(),
+				pendingCleanersRunnerProvider.createPendingCleanersRunner());
+		}
 
 		@Override
-		public CleanerFactory createCleanerFactory() {
-			Class<?> cleanerClass = findCleanerClass();
-			Method cleanerCreateMethod = getCleanerCreateMethod(cleanerClass);
-			Method cleanerCleanMethod = getCleanerCleanMethod(cleanerClass);
-			return new LegacyCleanerFactory(cleanerCreateMethod, cleanerCleanMethod);
+		public String toString() {
+			return cleanerName + " provider";
 		}
+	}
 
-		private static Class<?> findCleanerClass() {
-			try {
-				return Class.forName(LEGACY_CLEANER_CLASS_NAME);
-			} catch (ClassNotFoundException e) {
-				throw new FlinkRuntimeException("Failed to find Java legacy Cleaner class", e);
-			}
+	private static class CleanerManager {
+		private final String cleanerName;
+		private final CleanerFactory cleanerFactory;
+		private final PendingCleanersRunner pendingCleanersRunner;
+
+		private CleanerManager(
+				String cleanerName,
+				CleanerFactory cleanerFactory,
+				PendingCleanersRunner pendingCleanersRunner) {
+			this.cleanerName = cleanerName;
+			this.cleanerFactory = cleanerFactory;
+			this.pendingCleanersRunner = pendingCleanersRunner;
 		}
 
-		private static Method getCleanerCreateMethod(Class<?> cleanerClass) {
-			try {
-				return cleanerClass.getMethod("create", Object.class, Runnable.class);
-			} catch (NoSuchMethodException e) {
-				throw new FlinkRuntimeException("Failed to find Java legacy Cleaner#create method", e);
-			}
+		private Runnable create(Object owner, Runnable cleanOperation) {
+			return cleanerFactory.create(owner, cleanOperation);
 		}
 
-		private static Method getCleanerCleanMethod(Class<?> cleanerClass) {
-			try {
-				return cleanerClass.getMethod("clean");
-			} catch (NoSuchMethodException e) {
-				throw new FlinkRuntimeException("Failed to find Java legacy Cleaner#clean method", e);
-			}
+		private boolean tryRunPendingCleaners() throws InterruptedException {
+			return pendingCleanersRunner.tryRunPendingCleaners();
 		}
 
 		@Override
 		public String toString() {
-			return "Legacy cleaner provider before Java 9 using " + LEGACY_CLEANER_CLASS_NAME;
+			return cleanerName + " manager";
 		}
 	}
 
-	private static final class LegacyCleanerFactory implements CleanerFactory {
-		private final Method cleanerCreateMethod;
-		private final Method cleanerCleanMethod;
+	private static class CleanerFactoryProvider {
+		private final String cleanerName;
+		private final ReflectionUtils reflectionUtils;
+		private final String cleanerClassName;
+		private final Supplier<Optional<Object>> cleanerSupplier;
+		private final String cleanableCreationMethodName;
+		private final String cleanableClassName;
+		private final String cleanMethodName;
+
+		private CleanerFactoryProvider(
+				String cleanerName,
+				ReflectionUtils reflectionUtils,
+				String cleanerClassName,
+				Supplier<Optional<Object>> cleanerSupplier,
+				String cleanableCreationMethodName, // Cleaner is a factory for Cleanable
+				String cleanableClassName,
+				String cleanMethodName) {
+			this.cleanerName = cleanerName;
+			this.reflectionUtils = reflectionUtils;
+			this.cleanerClassName = cleanerClassName;
+			this.cleanerSupplier = cleanerSupplier;
+			this.cleanableCreationMethodName = cleanableCreationMethodName;
+			this.cleanableClassName = cleanableClassName;
+			this.cleanMethodName = cleanMethodName;
+		}
 
-		private LegacyCleanerFactory(Method cleanerCreateMethod, Method cleanerCleanMethod) {
-			this.cleanerCreateMethod = cleanerCreateMethod;
-			this.cleanerCleanMethod = cleanerCleanMethod;
+		private CleanerFactory createCleanerFactory() {
+			Class<?> cleanerClass = reflectionUtils.findClass(cleanerClassName);
+			Method cleanableCreationMethod = reflectionUtils.findMethod(
+				cleanerClass,
+				cleanableCreationMethodName,
+				Object.class,
+				Runnable.class);
+			Class<?> cleanableClass = reflectionUtils.findClass(cleanableClassName);
+			Method cleanMethod = reflectionUtils.findMethod(cleanableClass, cleanMethodName);
+			return new CleanerFactory(
+				cleanerName,
+				cleanerSupplier.get().orElse(null), // static method of Cleaner class will be called to create Cleanable
+				cleanableCreationMethod,
+				cleanMethod);
 		}
 
 		@Override
-		public Runnable create(Object owner, Runnable cleanupOperation) {
-			Object cleaner;
+		public String toString() {
+			return cleanerName + " factory provider using " + cleanerClassName;
+		}
+	}
+
+	private static class CleanerFactory {
+		private final String cleanerName;
+		@Nullable
+		private final Object cleaner;
+		private final Method cleanableCreationMethod;
+		private final Method cleanMethod;
+
+		private CleanerFactory(
+			String cleanerName,
+			@Nullable Object cleaner,
+			Method cleanableCreationMethod,
+			Method cleanMethod) {
+			this.cleanerName = cleanerName;
+			this.cleaner = cleaner;
+			this.cleanableCreationMethod = cleanableCreationMethod;
+			this.cleanMethod = cleanMethod;
+		}
+
+		private Runnable create(Object owner, Runnable cleanupOperation) {
+			Object cleanable;
 			try {
-				cleaner = cleanerCreateMethod.invoke(null, owner, cleanupOperation);
+				cleanable = cleanableCreationMethod.invoke(cleaner, owner, cleanupOperation);
 			} catch (IllegalAccessException | InvocationTargetException e) {
-				throw new Error("Failed to create a Java legacy Cleaner", e);
+				throw new Error("Failed to create a " + cleanerName, e);
 			}
 			String ownerString = owner.toString(); // lambda should not capture the owner object
 			return () -> {
 				try {
-					cleanerCleanMethod.invoke(cleaner);
+					cleanMethod.invoke(cleanable);
 				} catch (IllegalAccessException | InvocationTargetException e) {
-					String message = String.format("FATAL UNEXPECTED - Failed to invoke a Java legacy Cleaner for %s", ownerString);
+					String message = String.format("FATAL UNEXPECTED - Failed to invoke a %s for %s", cleanerName, ownerString);
 					LOG.error(message, e);
 					throw new Error(message, e);
 				}
@@ -153,106 +299,115 @@ public enum JavaGcCleanerWrapper {
 		}
 	}
 
-	/** New cleaner provider for Java 9+. */
-	private enum Java9CleanerProvider implements CleanerProvider {
-		INSTANCE;
+	private static class PendingCleanersRunnerProvider {
+		private final String cleanerName;
+		private final ReflectionUtils reflectionUtils;
+		private final String sharedSecretsClassName;
+		private final String javaLangRefAccessClassName;
+		private final String getJavaLangRefAccessName;
+		private final String tryHandlePendingReferenceName;
+
+		private PendingCleanersRunnerProvider(
+				String cleanerName,
+				ReflectionUtils reflectionUtils,
+				String sharedSecretsClassName,
+				String javaLangRefAccessClassName,
+				String getJavaLangRefAccessName,
+				String tryHandlePendingReferenceName) {
+			this.cleanerName = cleanerName;
+			this.reflectionUtils = reflectionUtils;
+			this.sharedSecretsClassName = sharedSecretsClassName;
+			this.javaLangRefAccessClassName = javaLangRefAccessClassName;
+			this.getJavaLangRefAccessName = getJavaLangRefAccessName;
+			this.tryHandlePendingReferenceName = tryHandlePendingReferenceName;
+		}
 
-		private static final String JAVA9_CLEANER_CLASS_NAME = "java.lang.ref.Cleaner";
+		private PendingCleanersRunner createPendingCleanersRunner() {
+			Class<?> sharedSecretsClass = reflectionUtils.findClass(sharedSecretsClassName);
+			Class<?> javaLangRefAccessClass = reflectionUtils.findClass(javaLangRefAccessClassName);
+			Method getJavaLangRefAccessMethod = reflectionUtils.findMethod(sharedSecretsClass, getJavaLangRefAccessName);
+			Method tryHandlePendingReferenceMethod = reflectionUtils.findMethod(
+				javaLangRefAccessClass,
+				tryHandlePendingReferenceName);
+			return new PendingCleanersRunner(getJavaLangRefAccessMethod, tryHandlePendingReferenceMethod);
+		}
 
 		@Override
-		public CleanerFactory createCleanerFactory() {
-			Class<?> cleanerClass = findCleanerClass();
-			Method cleanerCreateMethod = getCleanerCreateMethod(cleanerClass);
-			Object cleaner = createCleaner(cleanerCreateMethod);
-			Method cleanerRegisterMethod = getCleanerRegisterMethod(cleanerClass);
-			Class<?> cleanableClass = findCleanableClass();
-			Method cleanMethod = getCleanMethod(cleanableClass);
-			return new Java9CleanerFactory(cleaner, cleanerRegisterMethod, cleanMethod);
+		public String toString() {
+			return "Pending " + cleanerName + "s runner provider";
 		}
+	}
 
-		private static Class<?> findCleanerClass() {
-			try {
-				return Class.forName(JAVA9_CLEANER_CLASS_NAME);
-			} catch (ClassNotFoundException e) {
-				throw new FlinkRuntimeException("Failed to find Java 9 Cleaner class", e);
-			}
-		}
+	private static class PendingCleanersRunner {
+		private final Method getJavaLangRefAccessMethod;
+		private final Method waitForReferenceProcessingMethod;
 
-		private static Method getCleanerCreateMethod(Class<?> cleanerClass) {
-			try {
-				return cleanerClass.getMethod("create");
-			} catch (NoSuchMethodException e) {
-				throw new FlinkRuntimeException("Failed to find Java 9 Cleaner#create method", e);
-			}
+		private PendingCleanersRunner(Method getJavaLangRefAccessMethod, Method waitForReferenceProcessingMethod) {
+			this.getJavaLangRefAccessMethod = getJavaLangRefAccessMethod;
+			this.waitForReferenceProcessingMethod = waitForReferenceProcessingMethod;
 		}
 
-		private static Object createCleaner(Method cleanerCreateMethod) {
+		private boolean tryRunPendingCleaners() throws InterruptedException {
+			Object javaLangRefAccess = getJavaLangRefAccess();
 			try {
-				return cleanerCreateMethod.invoke(null);
+				return (Boolean) waitForReferenceProcessingMethod.invoke(javaLangRefAccess);
 			} catch (IllegalAccessException | InvocationTargetException e) {
-				throw new FlinkRuntimeException("Failed to create a Java 9 Cleaner", e);
-			}
-		}
-
-		private static Method getCleanerRegisterMethod(Class<?> cleanerClass) {
-			try {
-				return cleanerClass.getMethod("register", Object.class, Runnable.class);
-			} catch (NoSuchMethodException e) {
-				throw new FlinkRuntimeException("Failed to find Java 9 Cleaner#create method", e);
+				throwIfCauseIsInterruptedException(e);
+				return throwInvocationError(e, javaLangRefAccess, waitForReferenceProcessingMethod);
 			}
 		}
 
-		private static Class<?> findCleanableClass() {
+		private Object getJavaLangRefAccess() {
 			try {
-				return Class.forName("java.lang.ref.Cleaner$Cleanable");
-			} catch (ClassNotFoundException e) {
-				throw new FlinkRuntimeException("Failed to find Java 9 Cleaner#Cleanable class", e);
+				return getJavaLangRefAccessMethod.invoke(null);
+			} catch (IllegalAccessException | InvocationTargetException e) {
+				return throwInvocationError(e, null, waitForReferenceProcessingMethod);
 			}
 		}
 
-		private static Method getCleanMethod(Class<?> cleanableClass) {
-			try {
-				return cleanableClass.getMethod("clean");
-			} catch (NoSuchMethodException e) {
-				throw new FlinkRuntimeException("Failed to find Java 9 Cleaner$Cleanable#clean method", e);
+		private static void throwIfCauseIsInterruptedException(Throwable t) throws InterruptedException {
+			// if the original wrapped method can throw InterruptedException
+			// then we may want to explicitly handle in the user code for certain implementations
+			if (t.getCause() instanceof InterruptedException) {
+				throw (InterruptedException) t.getCause();
 			}
 		}
 
-		@Override
-		public String toString() {
-			return "New cleaner provider for Java 9+" + JAVA9_CLEANER_CLASS_NAME;
+		private static <T> T throwInvocationError(Throwable t, @Nullable Object obj, Method method) {
+			String message = String.format(
+				"FATAL UNEXPECTED - Failed to invoke %s%s",
+				obj == null ? "" : obj.getClass().getSimpleName() + '#',
+				method.getName());
+			LOG.error(message, t);
+			throw new Error(message, t);
 		}
 	}
 
-	private static final class Java9CleanerFactory implements CleanerFactory {
-		private final Object cleaner;
-		private final Method cleanerRegisterMethod;
-		private final Method cleanMethod;
+	private static class ReflectionUtils {
+		private final String logPrefix;
 
-		private Java9CleanerFactory(Object cleaner, Method cleanerRegisterMethod, Method cleanMethod) {
-			this.cleaner = cleaner;
-			this.cleanerRegisterMethod = cleanerRegisterMethod;
-			this.cleanMethod = cleanMethod;
+		private ReflectionUtils(String logPrefix) {
+			this.logPrefix = logPrefix;
 		}
 
-		@Override
-		public Runnable create(Object owner, Runnable cleanupOperation) {
-			Object cleanable;
+		private Class<?> findClass(String className) {
 			try {
-				cleanable = cleanerRegisterMethod.invoke(cleaner, owner, cleanupOperation);
-			} catch (IllegalAccessException | InvocationTargetException e) {
-				throw new Error("Failed to create a Java 9 Cleaner", e);
+				return Class.forName(className);
+			} catch (ClassNotFoundException e) {
+				throw new FlinkRuntimeException(
+					String.format("%s: Failed to find %s class", logPrefix, className.split("\\.")[0]),
+					e);
+			}
+		}
+
+		private Method findMethod(Class<?> cl, String methodName, Class<?>... parameterTypes) {
+			try {
+				return cl.getMethod(methodName, parameterTypes);
+			} catch (NoSuchMethodException e) {
+				throw new FlinkRuntimeException(
+					String.format("%s: Failed to find %s#%s method", logPrefix, cl.getSimpleName(), methodName),
+					e);
 			}
-			String ownerString = owner.toString(); // lambda should not capture the owner object
-			return () -> {
-				try {
-					cleanMethod.invoke(cleanable);
-				} catch (IllegalAccessException | InvocationTargetException e) {
-					String message = String.format("FATAL UNEXPECTED - Failed to invoke a Java 9 Cleaner$Cleanable for %s", ownerString);
-					LOG.error(message, e);
-					throw new Error(message, e);
-				}
-			};
 		}
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
index 51804d0..056c5ee 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
@@ -159,7 +159,7 @@ public class CrossSegmentTypeTest {
 			new HeapMemorySegment(new byte[size]),
 			MemorySegmentFactory.allocateUnpooledSegment(size),
 			MemorySegmentFactory.allocateUnpooledOffHeapMemory(size),
-			MemorySegmentFactory.allocateOffHeapUnsafeMemory(size, null)
+			MemorySegmentFactory.allocateOffHeapUnsafeMemory(size)
 		};
 		return segments;
 	}
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java b/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
index c2db44e..6e81b8e 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
@@ -47,7 +47,7 @@ public class EndiannessAccessChecks {
 
 	@Test
 	public void testHybridOffHeapUnsafeSegment() {
-		testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.allocateOffHeapUnsafeMemory(11111, null));
+		testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.allocateOffHeapUnsafeMemory(11111));
 	}
 
 	private void testBigAndLittleEndianAccessUnaligned(MemorySegment segment) {
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
index f167203..e5c70cf 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
@@ -33,11 +33,11 @@ public class HybridOffHeapUnsafeMemorySegmentTest extends HybridOffHeapMemorySeg
 
 	@Override
 	MemorySegment createSegment(int size) {
-		return MemorySegmentFactory.allocateOffHeapUnsafeMemory(size, null);
+		return MemorySegmentFactory.allocateOffHeapUnsafeMemory(size);
 	}
 
 	@Override
 	MemorySegment createSegment(int size, Object owner) {
-		return MemorySegmentFactory.allocateOffHeapUnsafeMemory(size, owner);
+		return MemorySegmentFactory.allocateOffHeapUnsafeMemory(size, owner, () -> {});
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
index 3e3e267..09619cd 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
@@ -46,12 +46,12 @@ public class MemorySegmentChecksTest {
 
 	@Test(expected = NullPointerException.class)
 	public void testHybridOffHeapNullBuffer2() {
-		new HybridMemorySegment(null, new Object(), () -> {});
+		new HybridMemorySegment((ByteBuffer) null, new Object());
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testHybridNonDirectBuffer() {
-		new HybridMemorySegment(ByteBuffer.allocate(1024), new Object(), () -> {});
+		new HybridMemorySegment(ByteBuffer.allocate(1024), new Object());
 	}
 
 	@Test(expected = IllegalArgumentException.class)
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
index 1363703..1566ac0 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
@@ -63,7 +63,7 @@ public class MemorySegmentUndersizedTest {
 
 	@Test
 	public void testZeroSizeOffHeapUnsafeHybridSegment() {
-		MemorySegment segment = MemorySegmentFactory.allocateOffHeapUnsafeMemory(0, null);
+		MemorySegment segment = MemorySegmentFactory.allocateOffHeapUnsafeMemory(0);
 
 		testZeroSizeBuffer(segment);
 		testSegmentWithSizeLargerZero(segment);
@@ -86,7 +86,7 @@ public class MemorySegmentUndersizedTest {
 
 	@Test
 	public void testSizeOneOffHeapUnsafeHybridSegment() {
-		testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateOffHeapUnsafeMemory(1, null));
+		testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateOffHeapUnsafeMemory(1));
 	}
 
 	private static void testZeroSizeBuffer(MemorySegment segment) {
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
index bf27fc1..0013399 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
@@ -128,7 +128,7 @@ public class OperationsOnFreedSegmentTest {
 		MemorySegment heap = new HeapMemorySegment(new byte[PAGE_SIZE]);
 		MemorySegment hybridHeap = MemorySegmentFactory.wrap(new byte[PAGE_SIZE]);
 		MemorySegment hybridOffHeap = MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
-		MemorySegment hybridOffHeapUnsafe = MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE, null);
+		MemorySegment hybridOffHeapUnsafe = MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE);
 
 		MemorySegment[] segments = { heap, hybridHeap, hybridOffHeap, hybridOffHeapUnsafe };
 
diff --git a/flink-core/src/test/java/org/apache/flink/util/JavaGcCleanerWrapperTest.java b/flink-core/src/test/java/org/apache/flink/util/JavaGcCleanerWrapperTest.java
index ead8fce..1785003 100644
--- a/flink-core/src/test/java/org/apache/flink/util/JavaGcCleanerWrapperTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/JavaGcCleanerWrapperTest.java
@@ -32,7 +32,7 @@ public class JavaGcCleanerWrapperTest {
 	@Test
 	public void testCleanOperationRunsOnlyOnceEitherOnGcOrExplicitly() throws InterruptedException {
 		AtomicInteger callCounter = new AtomicInteger();
-		Runnable cleaner = JavaGcCleanerWrapper.create(new Object(), callCounter::incrementAndGet);
+		Runnable cleaner = JavaGcCleanerWrapper.createCleaner(new Object(), callCounter::incrementAndGet);
 		System.gc(); // not guaranteed to be run always but should in practice
 		Thread.sleep(10); // more chance for GC to run
 		cleaner.run();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index e9c7b30..948053a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.function.ThrowingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnegative;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -42,7 +41,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -77,11 +75,9 @@ public class MemoryManager {
 
 	private final long pageSize;
 
-	private final long totalMemorySize;
-
 	private final long totalNumberOfPages;
 
-	private final AtomicLong availableMemorySize;
+	private final UnsafeMemoryBudget memoryBudget;
 
 	private final SharedResources sharedResources;
 
@@ -98,13 +94,12 @@ public class MemoryManager {
 		sanityCheck(memorySize, pageSize);
 
 		this.pageSize = pageSize;
-		this.totalMemorySize = memorySize;
+		this.memoryBudget = new UnsafeMemoryBudget(memorySize);
 		this.totalNumberOfPages = memorySize / pageSize;
 		this.allocatedSegments = new ConcurrentHashMap<>();
 		this.reservedMemory = new ConcurrentHashMap<>();
-		this.availableMemorySize = new AtomicLong(totalMemorySize);
 		this.sharedResources = new SharedResources();
-		verifyIntTotalNumberOfPages(totalMemorySize, totalNumberOfPages);
+		verifyIntTotalNumberOfPages(memorySize, totalNumberOfPages);
 
 		LOG.debug(
 			"Initialized MemoryManager with total memory size {} and page size {}.",
@@ -146,7 +141,6 @@ public class MemoryManager {
 			// mark as shutdown and release memory
 			isShutDown = true;
 			reservedMemory.clear();
-			availableMemorySize.set(totalMemorySize);
 
 			// go over all allocated segments and release them
 			for (Set<MemorySegment> segments : allocatedSegments.values()) {
@@ -175,7 +169,7 @@ public class MemoryManager {
 	 * @return True, if the memory manager is empty and valid, false if it is not empty or corrupted.
 	 */
 	public boolean verifyEmpty() {
-		return availableMemorySize.get() == totalMemorySize;
+		return memoryBudget.verifyEmpty();
 	}
 
 	// ------------------------------------------------------------------------
@@ -230,16 +224,17 @@ public class MemoryManager {
 
 		long memoryToReserve = numberOfPages * pageSize;
 		try {
-			reserveMemory(memoryToReserve);
+			memoryBudget.reserveMemory(memoryToReserve);
 		} catch (MemoryReservationException e) {
 			throw new MemoryAllocationException(String.format("Could not allocate %d pages", numberOfPages), e);
 		}
 
+		Runnable pageCleanup = this::releasePage;
 		allocatedSegments.compute(owner, (o, currentSegmentsForOwner) -> {
 			Set<MemorySegment> segmentsForOwner = currentSegmentsForOwner == null ?
 				new HashSet<>(numberOfPages) : currentSegmentsForOwner;
 			for (long i = numberOfPages; i > 0; i--) {
-				MemorySegment segment = allocateOffHeapUnsafeMemory(getPageSize(), owner);
+				MemorySegment segment = allocateOffHeapUnsafeMemory(getPageSize(), owner, pageCleanup);
 				target.add(segment);
 				segmentsForOwner.add(segment);
 			}
@@ -249,6 +244,10 @@ public class MemoryManager {
 		Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");
 	}
 
+	private void releasePage() {
+		memoryBudget.releaseMemory(getPageSize());
+	}
+
 	/**
 	 * Tries to release the memory for the specified segment.
 	 *
@@ -270,9 +269,7 @@ public class MemoryManager {
 		try {
 			allocatedSegments.computeIfPresent(segment.getOwner(), (o, segsForOwner) -> {
 				segment.free();
-				if (segsForOwner.remove(segment)) {
-					releaseMemory(getPageSize());
-				}
+				segsForOwner.remove(segment);
 				return segsForOwner.isEmpty() ? null : segsForOwner;
 			});
 		}
@@ -296,8 +293,6 @@ public class MemoryManager {
 
 		Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
 
-		AtomicLong releasedMemory = new AtomicLong(0L);
-
 		// since concurrent modifications to the collection
 		// can disturb the release, we need to try potentially multiple times
 		boolean successfullyReleased = false;
@@ -316,7 +311,7 @@ public class MemoryManager {
 					segment = segmentsIterator.next();
 				}
 				while (segment != null) {
-					segment = releaseSegmentsForOwnerUntilNextOwner(segment, segmentsIterator, releasedMemory);
+					segment = releaseSegmentsForOwnerUntilNextOwner(segment, segmentsIterator);
 				}
 				segments.clear();
 				// the only way to exit the loop
@@ -326,18 +321,15 @@ public class MemoryManager {
 				// call releases the memory. fall through the loop and try again
 			}
 		} while (!successfullyReleased);
-
-		releaseMemory(releasedMemory.get());
 	}
 
 	private MemorySegment releaseSegmentsForOwnerUntilNextOwner(
 			MemorySegment firstSeg,
-			Iterator<MemorySegment> segmentsIterator,
-			AtomicLong releasedMemory) {
+			Iterator<MemorySegment> segmentsIterator) {
 		AtomicReference<MemorySegment> nextOwnerMemorySegment = new AtomicReference<>();
 		Object owner = firstSeg.getOwner();
 		allocatedSegments.compute(owner, (o, segsForOwner) -> {
-			releasedMemory.addAndGet(freeSegment(firstSeg, segsForOwner));
+			freeSegment(firstSeg, segsForOwner);
 			while (segmentsIterator.hasNext()) {
 				MemorySegment segment = segmentsIterator.next();
 				try {
@@ -349,7 +341,7 @@ public class MemoryManager {
 						nextOwnerMemorySegment.set(segment);
 						break;
 					}
-					releasedMemory.addAndGet(freeSegment(segment, segsForOwner));
+					freeSegment(segment, segsForOwner);
 				} catch (Throwable t) {
 					throw new RuntimeException(
 						"Error removing book-keeping reference to allocated memory segment.", t);
@@ -360,9 +352,11 @@ public class MemoryManager {
 		return nextOwnerMemorySegment.get();
 	}
 
-	private long freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) {
+	private static void freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) {
 		segment.free();
-		return segments != null && segments.remove(segment) ? getPageSize() : 0L;
+		if (segments != null) {
+			segments.remove(segment);
+		}
 	}
 
 	/**
@@ -386,12 +380,9 @@ public class MemoryManager {
 		}
 
 		// free each segment
-		long releasedMemory = 0L;
-		for (MemorySegment segment : segments) {
+		for (MemorySegment segment: segments) {
 			segment.free();
-			releasedMemory += getPageSize();
 		}
-		releaseMemory(releasedMemory);
 
 		segments.clear();
 	}
@@ -410,7 +401,7 @@ public class MemoryManager {
 			return;
 		}
 
-		reserveMemory(size);
+		memoryBudget.reserveMemory(size);
 
 		reservedMemory.compute(owner, (o, memoryReservedForOwner) ->
 			memoryReservedForOwner == null ? size : memoryReservedForOwner + size);
@@ -450,7 +441,7 @@ public class MemoryManager {
 
 	private long releaseAndCalculateReservedMemory(long memoryToFree, long currentlyReserved) {
 		final long effectiveMemoryToRelease = Math.min(currentlyReserved, memoryToFree);
-		releaseMemory(effectiveMemoryToRelease);
+		memoryBudget.releaseMemory(effectiveMemoryToRelease);
 
 		return currentlyReserved - effectiveMemoryToRelease;
 	}
@@ -470,7 +461,7 @@ public class MemoryManager {
 		checkMemoryReservationPreconditions(owner, 0L);
 		Long memoryReservedForOwner = reservedMemory.remove(owner);
 		if (memoryReservedForOwner != null) {
-			releaseMemory(memoryReservedForOwner);
+			memoryBudget.releaseMemory(memoryReservedForOwner);
 		}
 	}
 
@@ -595,7 +586,7 @@ public class MemoryManager {
 	 * @return The total size of memory.
 	 */
 	public long getMemorySize() {
-		return totalMemorySize;
+		return memoryBudget.getTotalMemorySize();
 	}
 
 	/**
@@ -604,7 +595,7 @@ public class MemoryManager {
 	 * @return The available amount of memory.
 	 */
 	public long availableMemory() {
-		return availableMemorySize.get();
+		return memoryBudget.getAvailableMemorySize();
 	}
 
 	/**
@@ -636,44 +627,7 @@ public class MemoryManager {
 			"The fraction of memory to allocate must within (0, 1], was: %s", fraction);
 
 		//noinspection NumericCastThatLosesPrecision
-		return (long) Math.floor(totalMemorySize * fraction);
-	}
-
-	private void reserveMemory(long size) throws MemoryReservationException {
-		long availableOrReserved = tryReserveMemory(size);
-		if (availableOrReserved < size) {
-			throw new MemoryReservationException(
-				String.format("Could not allocate %d bytes, only %d bytes are remaining", size, availableOrReserved));
-		}
-	}
-
-	private long tryReserveMemory(long size) {
-		long currentAvailableMemorySize;
-		while (size <= (currentAvailableMemorySize = availableMemorySize.get())) {
-			if (availableMemorySize.compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize - size)) {
-				return size;
-			}
-		}
-		return currentAvailableMemorySize;
-	}
-
-	private void releaseMemory(@Nonnegative long size) {
-		if (size == 0) {
-			return;
-		}
-		boolean released = false;
-		long currentAvailableMemorySize = 0L;
-		while (!released && totalMemorySize >= (currentAvailableMemorySize = availableMemorySize.get()) + size) {
-			released = availableMemorySize
-				.compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize + size);
-		}
-		if (!released) {
-			throw new IllegalStateException(String.format(
-				"Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes",
-				size,
-				currentAvailableMemorySize,
-				totalMemorySize));
-		}
+		return (long) Math.floor(memoryBudget.getTotalMemorySize() * fraction);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
new file mode 100644
index 0000000..a85f40e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.util.JavaGcCleanerWrapper;
+
+import javax.annotation.Nonnegative;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Tracker of memory allocation and release within a custom limit.
+ *
+ * <p>This memory management in Flink uses the same approach as Java uses to allocate direct memory
+ * and release it upon GC but memory here can be also released directly with {@link #releaseMemory(long)}.
+ * The memory reservation {@link #reserveMemory(long)} tries firstly to run all phantom cleaners, created with
+ * {@link org.apache.flink.core.memory.MemoryUtils#createMemoryGcCleaner(Object, long, Runnable)},
+ * for objects which are ready to be garbage collected. If memory is still not available, it triggers GC and
+ * continues to process any ready cleaners making {@link #MAX_SLEEPS} attempts before throwing {@link OutOfMemoryError}.
+ */
+class UnsafeMemoryBudget {
+	// max. number of sleeps during try-reserving with exponentially
+	// increasing delay before throwing OutOfMemoryError:
+	// 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+	// which means that MemoryReservationException will be thrown after 1 s of trying
+	private static final int MAX_SLEEPS = 10;
+	private static final int RETRIGGER_GC_AFTER_SLEEPS = 9; // ~ 0.5 sec
+
+	private final long totalMemorySize;
+
+	private final AtomicLong availableMemorySize;
+
+	UnsafeMemoryBudget(long totalMemorySize) {
+		this.totalMemorySize = totalMemorySize;
+		this.availableMemorySize = new AtomicLong(totalMemorySize);
+	}
+
+	long getTotalMemorySize() {
+		return totalMemorySize;
+	}
+
+	long getAvailableMemorySize() {
+		return availableMemorySize.get();
+	}
+
+	boolean verifyEmpty() {
+		try {
+			reserveMemory(totalMemorySize);
+		} catch (MemoryReservationException e) {
+			return false;
+		}
+		releaseMemory(totalMemorySize);
+		return availableMemorySize.get() == totalMemorySize;
+	}
+
+	/**
+	 * Reserve memory of certain size if it is available.
+	 *
+	 * <p>Adjusted version of {@link java.nio.Bits#reserveMemory(long, int)} taken from Java 11.
+	 */
+	@SuppressWarnings({"OverlyComplexMethod", "JavadocReference", "NestedTryStatement"})
+	void reserveMemory(long size) throws MemoryReservationException {
+		long availableOrReserved = tryReserveMemory(size);
+		// optimist!
+		if (availableOrReserved >= size) {
+			return;
+		}
+
+		boolean interrupted = false;
+		try {
+
+			// Retry allocation until success or there are no more
+			// references (including Cleaners that might free direct
+			// buffer memory) to process and allocation still fails.
+			boolean refprocActive;
+			do {
+				try {
+					refprocActive = JavaGcCleanerWrapper.tryRunPendingCleaners();
+				} catch (InterruptedException e) {
+					// Defer interrupts and keep trying.
+					interrupted = true;
+					refprocActive = true;
+				}
+				availableOrReserved = tryReserveMemory(size);
+				if (availableOrReserved >= size) {
+					return;
+				}
+			} while (refprocActive);
+
+			// trigger VM's Reference processing
+			System.gc();
+
+			// A retry loop with exponential back-off delays.
+			// Sometimes it would suffice to give up once reference
+			// processing is complete.  But if there are many threads
+			// competing for memory, this gives more opportunities for
+			// any given thread to make progress.  In particular, this
+			// seems to be enough for a stress test like
+			// DirectBufferAllocTest to (usually) succeed, while
+			// without it that test likely fails.  Since failure here
+			// ends in MemoryReservationException, there's no need to hurry.
+			long sleepTime = 1;
+			int sleeps = 0;
+			while (true) {
+				availableOrReserved = tryReserveMemory(size);
+				if (availableOrReserved >= size) {
+					return;
+				}
+				if (sleeps >= MAX_SLEEPS) {
+					break;
+				}
+				if (sleeps >= RETRIGGER_GC_AFTER_SLEEPS) {
+					// trigger again VM's Reference processing if we have to wait longer
+					System.gc();
+				}
+				try {
+					if (!JavaGcCleanerWrapper.tryRunPendingCleaners()) {
+						Thread.sleep(sleepTime);
+						sleepTime <<= 1;
+						sleeps++;
+					}
+				} catch (InterruptedException e) {
+					interrupted = true;
+				}
+			}
+
+			// no luck
+			throw new MemoryReservationException(
+				String.format("Could not allocate %d bytes, only %d bytes are remaining", size, availableOrReserved));
+
+		} finally {
+			if (interrupted) {
+				// don't swallow interrupts
+				Thread.currentThread().interrupt();
+			}
+		}
+	}
+
+	private long tryReserveMemory(long size) {
+		long currentAvailableMemorySize;
+		while (size <= (currentAvailableMemorySize = availableMemorySize.get())) {
+			if (availableMemorySize.compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize - size)) {
+				return size;
+			}
+		}
+		return currentAvailableMemorySize;
+	}
+
+	void releaseMemory(@Nonnegative long size) {
+		if (size == 0) {
+			return;
+		}
+		boolean released = false;
+		long currentAvailableMemorySize = 0L;
+		while (!released && totalMemorySize >= (currentAvailableMemorySize = availableMemorySize.get()) + size) {
+			released = availableMemorySize
+				.compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize + size);
+		}
+		if (!released) {
+			throw new IllegalStateException(String.format(
+				"Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes",
+				size,
+				currentAvailableMemorySize,
+				totalMemorySize));
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index adccccd..5297525 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -27,10 +27,12 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -292,6 +294,52 @@ public class MemoryManagerTest {
 		memoryManager.releaseAllMemory(owner2);
 	}
 
+	@Test(expected = MemoryAllocationException.class)
+	public void testAllocationFailsIfSegmentsNotGced() throws MemoryAllocationException {
+		List<ByteBuffer> byteBuffers = allocateAndReleaseAllSegmentsButKeepWrappedBufferRefs();
+		// this allocation should fail
+		memoryManager.allocatePages(new Object(), 1);
+		// this should not be reached but keeps the reference to the allocated memory and prevents its GC
+		byteBuffers.get(0).put(0, (byte) 1);
+	}
+
+	@Test(expected = MemoryReservationException.class)
+	public void testReservationFailsIfSegmentsNotGced() throws MemoryAllocationException, MemoryReservationException {
+		List<ByteBuffer> byteBuffers = allocateAndReleaseAllSegmentsButKeepWrappedBufferRefs();
+		// this allocation should fail
+		memoryManager.reserveMemory(new Object(), MemoryManager.DEFAULT_PAGE_SIZE);
+		// this should not be reached but keeps the reference to the allocated memory and prevents its GC
+		byteBuffers.get(0).put(0, (byte) 1);
+	}
+
+	@Test
+	public void testAllocationSuccessIfSegmentsGced() throws MemoryAllocationException {
+		allocateAndReleaseAllSegmentsButKeepWrappedBufferRefs();
+		// no reference to the allocated segments at this point, so the memory should be released by GC
+		// and this allocation should be successful
+		memoryManager.release(memoryManager.allocatePages(new Object(), 1));
+	}
+
+	@Test
+	public void testReservationSuccessIfSegmentsGced() throws MemoryAllocationException, MemoryReservationException {
+		allocateAndReleaseAllSegmentsButKeepWrappedBufferRefs();
+		// no reference to the allocated segments at this point, so the memory should be released by GC
+		Object owner = new Object();
+		// and this reservation should be successful
+		memoryManager.reserveMemory(owner, MemoryManager.DEFAULT_PAGE_SIZE);
+		memoryManager.releaseMemory(owner, MemoryManager.DEFAULT_PAGE_SIZE);
+	}
+
+	private List<ByteBuffer> allocateAndReleaseAllSegmentsButKeepWrappedBufferRefs() throws MemoryAllocationException {
+		List<MemorySegment> segments = memoryManager.allocatePages(new Object(), NUM_PAGES);
+		List<ByteBuffer> buffers = segments
+			.stream()
+			.map(segment -> segment.wrap(0, 1))
+			.collect(Collectors.toList());
+		memoryManager.release(segments);
+		return buffers;
+	}
+
 	@Test
 	public void testComputeMemorySize() {
 		double fraction = 0.6;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/UnsafeMemoryBudgetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/UnsafeMemoryBudgetTest.java
new file mode 100644
index 0000000..4f6edd8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/UnsafeMemoryBudgetTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.util.JavaGcCleanerWrapper;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Test suite for {@link UnsafeMemoryBudget}. */
+public class UnsafeMemoryBudgetTest extends TestLogger {
+
+	@Test
+	public void testGetTotalMemory() {
+		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		assertThat(budget.getTotalMemorySize(), is(100L));
+	}
+
+	@Test
+	public void testReserveMemory() throws MemoryReservationException {
+		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		budget.reserveMemory(50L);
+		assertThat(budget.getAvailableMemorySize(), is(50L));
+	}
+
+	@Test(expected = MemoryReservationException.class)
+	public void testReserveMemoryOverLimitFails() throws MemoryReservationException {
+		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		budget.reserveMemory(120L);
+	}
+
+	@Test
+	public void testReleaseMemory() throws MemoryReservationException {
+		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		budget.reserveMemory(50L);
+		budget.releaseMemory(30L);
+		assertThat(budget.getAvailableMemorySize(), is(80L));
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testReleaseMemoryMoreThanReservedFails() throws MemoryReservationException {
+		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		budget.reserveMemory(50L);
+		budget.releaseMemory(70L);
+	}
+
+	@Test(expected = MemoryReservationException.class)
+	public void testReservationFailsIfOwnerNotGced() throws MemoryReservationException {
+		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		Object memoryOwner = new Object();
+		budget.reserveMemory(50L);
+		JavaGcCleanerWrapper.createCleaner(memoryOwner, () -> budget.releaseMemory(50L));
+		budget.reserveMemory(60L);
+		// this should not be reached but keeps the reference to the memoryOwner and prevents its GC
+		log.info(memoryOwner.toString());
+	}
+
+	@Test
+	public void testReservationSuccessIfOwnerGced() throws MemoryReservationException {
+		UnsafeMemoryBudget budget = new UnsafeMemoryBudget(100L);
+		budget.reserveMemory(50L);
+		JavaGcCleanerWrapper.createCleaner(new Object(), () -> budget.releaseMemory(50L));
+		budget.reserveMemory(60L);
+		assertThat(budget.getAvailableMemorySize(), is(40L));
+	}
+}


[flink] 04/04: [hotfix] remove IntelliJ '//noinspection ...' directives from MemoryManager

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8fcdd66f98efbab05db25e028c9ebdc585c3155
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Apr 23 17:25:28 2020 +0300

    [hotfix] remove IntelliJ '//noinspection ...' directives from MemoryManager
---
 .../src/main/java/org/apache/flink/runtime/memory/MemoryManager.java  | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 948053a..bb26549 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -304,7 +304,6 @@ public class MemoryManager {
 			// if not, we can change it to the simpler approach for the better readability.
 			Iterator<MemorySegment> segmentsIterator = segments.iterator();
 
-			//noinspection ProhibitedExceptionCaught
 			try {
 				MemorySegment segment = null;
 				while (segment == null && segmentsIterator.hasNext()) {
@@ -576,7 +575,6 @@ public class MemoryManager {
 	 * @return The size of the pages handled by the memory manager.
 	 */
 	public int getPageSize() {
-		//noinspection NumericCastThatLosesPrecision
 		return (int) pageSize;
 	}
 
@@ -611,7 +609,6 @@ public class MemoryManager {
 			throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
 		}
 
-		//noinspection NumericCastThatLosesPrecision
 		return (int) (totalNumberOfPages * fraction);
 	}
 
@@ -626,7 +623,6 @@ public class MemoryManager {
 			fraction > 0 && fraction <= 1,
 			"The fraction of memory to allocate must within (0, 1], was: %s", fraction);
 
-		//noinspection NumericCastThatLosesPrecision
 		return (long) Math.floor(memoryBudget.getTotalMemorySize() * fraction);
 	}
 


[flink] 02/04: [FLINK-15758][MemManager] Remove MemoryManager#AllocationRequest

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33ba6be0b54a3033cb1387a8f4ba39d2f8be3e2d
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Tue Feb 4 15:29:08 2020 +0100

    [FLINK-15758][MemManager] Remove MemoryManager#AllocationRequest
---
 .../apache/flink/runtime/memory/MemoryManager.java | 94 ----------------------
 .../flink/runtime/memory/MemoryManagerTest.java    | 16 ++--
 2 files changed, 7 insertions(+), 103 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 72df659..e9c7b30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -192,9 +192,7 @@ public class MemoryManager {
 	 * @return A list with the memory segments.
 	 * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
 	 *                                   of memory pages any more.
-	 * @deprecated use {@link #allocatePages(AllocationRequest)}
 	 */
-	@Deprecated
 	public List<MemorySegment> allocatePages(Object owner, int numPages) throws MemoryAllocationException {
 		List<MemorySegment> segments = new ArrayList<>(numPages);
 		allocatePages(owner, segments, numPages);
@@ -211,36 +209,11 @@ public class MemoryManager {
 	 * @param numberOfPages The number of pages to allocate.
 	 * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
 	 *                                   of memory pages any more.
-	 * @deprecated use {@link #allocatePages(AllocationRequest)}
 	 */
-	@Deprecated
 	public void allocatePages(
 			Object owner,
 			Collection<MemorySegment> target,
 			int numberOfPages) throws MemoryAllocationException {
-		allocatePages(AllocationRequest
-			.newBuilder(owner)
-			.numberOfPages(numberOfPages)
-			.withOutput(target)
-			.build());
-	}
-
-	/**
-	 * Allocates a set of memory segments from this memory manager.
-	 *
-	 * <p>The allocated segments can have any memory type. The total allocated memory for each type will not exceed its
-	 * size limit, announced in the constructor.
-	 *
-	 * @param request The allocation request which contains all the parameters.
-	 * @return A collection with the allocated memory segments.
-	 * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
-	 *                                   of memory pages any more.
-	 */
-	public Collection<MemorySegment> allocatePages(AllocationRequest request) throws MemoryAllocationException {
-		Object owner = request.getOwner();
-		Collection<MemorySegment> target = request.output;
-		int numberOfPages = request.getNumberOfPages();
-
 		// sanity check
 		Preconditions.checkNotNull(owner, "The memory owner must not be null.");
 		Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
@@ -274,8 +247,6 @@ public class MemoryManager {
 		});
 
 		Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");
-
-		return target;
 	}
 
 	/**
@@ -705,71 +676,6 @@ public class MemoryManager {
 		}
 	}
 
-	/** Memory segment allocation request. */
-	@SuppressWarnings("WeakerAccess")
-	public static class AllocationRequest {
-		/** Owner of the segment to track by. */
-		private final Object owner;
-
-		/** Collection to add the allocated segments to. */
-		private final Collection<MemorySegment> output;
-
-		/** Number of pages to allocate. */
-		private final int numberOfPages;
-
-		private AllocationRequest(
-				Object owner,
-				Collection<MemorySegment> output,
-				int numberOfPages) {
-			this.owner = owner;
-			this.output = output;
-			this.numberOfPages = numberOfPages;
-		}
-
-		public Object getOwner() {
-			return owner;
-		}
-
-		public int getNumberOfPages() {
-			return numberOfPages;
-		}
-
-		public static Builder newBuilder(Object owner) {
-			return new Builder(owner);
-		}
-
-		public static AllocationRequest forOf(Object owner, int numberOfPages) {
-			return newBuilder(owner).numberOfPages(numberOfPages).build();
-		}
-	}
-
-	/** A builder for the {@link AllocationRequest}. */
-	@SuppressWarnings("WeakerAccess")
-	public static class Builder {
-		private final Object owner;
-		private Collection<MemorySegment> output = new ArrayList<>();
-		private int numberOfPages = 1;
-
-		public Builder(Object owner) {
-			this.owner = owner;
-		}
-
-		public Builder withOutput(Collection<MemorySegment> output) {
-			//noinspection AssignmentOrReturnOfFieldWithMutableType
-			this.output = output;
-			return this;
-		}
-
-		public Builder numberOfPages(int numberOfPages) {
-			this.numberOfPages = numberOfPages;
-			return this;
-		}
-
-		public AllocationRequest build() {
-			return new AllocationRequest(owner, output, numberOfPages);
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  factories for testing
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index 6a553df..adccccd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.memory;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager.AllocationRequest;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 
 import org.junit.After;
@@ -33,7 +32,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 
-import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.forOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -165,7 +163,7 @@ public class MemoryManagerTest {
 
 			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
 
-			testCannotAllocateAnymore(forOf(mockInvoke, 1));
+			testCannotAllocateAnymore(mockInvoke, 1);
 
 			Assert.assertTrue("The previously allocated segments were not valid any more.",
 																	allMemorySegmentsValid(segs));
@@ -182,13 +180,13 @@ public class MemoryManagerTest {
 	public void doubleReleaseReturnsMemoryOnlyOnce() throws MemoryAllocationException {
 		final AbstractInvokable mockInvoke = new DummyInvokable();
 
-		Collection<MemorySegment> segs = this.memoryManager.allocatePages(forOf(mockInvoke, NUM_PAGES));
+		Collection<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
 		MemorySegment segment = segs.iterator().next();
 
 		this.memoryManager.release(segment);
 		this.memoryManager.release(segment);
 
-		testCannotAllocateAnymore(forOf(mockInvoke, 2));
+		testCannotAllocateAnymore(mockInvoke, 2);
 
 		this.memoryManager.releaseAll(mockInvoke);
 	}
@@ -281,13 +279,13 @@ public class MemoryManagerTest {
 
 		// allocate half memory for segments
 		Object owner1 = new Object();
-		memoryManager.allocatePages(forOf(owner1, totalPagesForType / 2));
+		memoryManager.allocatePages(owner1, totalPagesForType / 2);
 
 		// reserve the other half of memory
 		Object owner2 = new Object();
 		memoryManager.reserveMemory(owner2, (long) PAGE_SIZE * totalPagesForType / 2);
 
-		testCannotAllocateAnymore(forOf(new Object(), 1));
+		testCannotAllocateAnymore(new Object(), 1);
 		testCannotReserveAnymore(1L);
 
 		memoryManager.releaseAll(owner1);
@@ -318,9 +316,9 @@ public class MemoryManagerTest {
 		memoryManager.computeMemorySize(-0.1);
 	}
 
-	private void testCannotAllocateAnymore(AllocationRequest request) {
+	private void testCannotAllocateAnymore(Object owner, int numPages) {
 		try {
-			memoryManager.allocatePages(request);
+			memoryManager.allocatePages(owner, numPages);
 			Assert.fail("Expected MemoryAllocationException. " +
 				"We should not be able to allocate after allocating or(and) reserving all memory of a certain type.");
 		} catch (MemoryAllocationException maex) {