You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/05/19 07:46:54 UTC

[flink] 02/04: [FLINK-15758][MemManager] Remove MemoryManager#AllocationRequest

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33ba6be0b54a3033cb1387a8f4ba39d2f8be3e2d
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Tue Feb 4 15:29:08 2020 +0100

    [FLINK-15758][MemManager] Remove MemoryManager#AllocationRequest
---
 .../apache/flink/runtime/memory/MemoryManager.java | 94 ----------------------
 .../flink/runtime/memory/MemoryManagerTest.java    | 16 ++--
 2 files changed, 7 insertions(+), 103 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 72df659..e9c7b30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -192,9 +192,7 @@ public class MemoryManager {
 	 * @return A list with the memory segments.
 	 * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
 	 *                                   of memory pages any more.
-	 * @deprecated use {@link #allocatePages(AllocationRequest)}
 	 */
-	@Deprecated
 	public List<MemorySegment> allocatePages(Object owner, int numPages) throws MemoryAllocationException {
 		List<MemorySegment> segments = new ArrayList<>(numPages);
 		allocatePages(owner, segments, numPages);
@@ -211,36 +209,11 @@ public class MemoryManager {
 	 * @param numberOfPages The number of pages to allocate.
 	 * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
 	 *                                   of memory pages any more.
-	 * @deprecated use {@link #allocatePages(AllocationRequest)}
 	 */
-	@Deprecated
 	public void allocatePages(
 			Object owner,
 			Collection<MemorySegment> target,
 			int numberOfPages) throws MemoryAllocationException {
-		allocatePages(AllocationRequest
-			.newBuilder(owner)
-			.numberOfPages(numberOfPages)
-			.withOutput(target)
-			.build());
-	}
-
-	/**
-	 * Allocates a set of memory segments from this memory manager.
-	 *
-	 * <p>The allocated segments can have any memory type. The total allocated memory for each type will not exceed its
-	 * size limit, announced in the constructor.
-	 *
-	 * @param request The allocation request which contains all the parameters.
-	 * @return A collection with the allocated memory segments.
-	 * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
-	 *                                   of memory pages any more.
-	 */
-	public Collection<MemorySegment> allocatePages(AllocationRequest request) throws MemoryAllocationException {
-		Object owner = request.getOwner();
-		Collection<MemorySegment> target = request.output;
-		int numberOfPages = request.getNumberOfPages();
-
 		// sanity check
 		Preconditions.checkNotNull(owner, "The memory owner must not be null.");
 		Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
@@ -274,8 +247,6 @@ public class MemoryManager {
 		});
 
 		Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");
-
-		return target;
 	}
 
 	/**
@@ -705,71 +676,6 @@ public class MemoryManager {
 		}
 	}
 
-	/** Memory segment allocation request. */
-	@SuppressWarnings("WeakerAccess")
-	public static class AllocationRequest {
-		/** Owner of the segment to track by. */
-		private final Object owner;
-
-		/** Collection to add the allocated segments to. */
-		private final Collection<MemorySegment> output;
-
-		/** Number of pages to allocate. */
-		private final int numberOfPages;
-
-		private AllocationRequest(
-				Object owner,
-				Collection<MemorySegment> output,
-				int numberOfPages) {
-			this.owner = owner;
-			this.output = output;
-			this.numberOfPages = numberOfPages;
-		}
-
-		public Object getOwner() {
-			return owner;
-		}
-
-		public int getNumberOfPages() {
-			return numberOfPages;
-		}
-
-		public static Builder newBuilder(Object owner) {
-			return new Builder(owner);
-		}
-
-		public static AllocationRequest forOf(Object owner, int numberOfPages) {
-			return newBuilder(owner).numberOfPages(numberOfPages).build();
-		}
-	}
-
-	/** A builder for the {@link AllocationRequest}. */
-	@SuppressWarnings("WeakerAccess")
-	public static class Builder {
-		private final Object owner;
-		private Collection<MemorySegment> output = new ArrayList<>();
-		private int numberOfPages = 1;
-
-		public Builder(Object owner) {
-			this.owner = owner;
-		}
-
-		public Builder withOutput(Collection<MemorySegment> output) {
-			//noinspection AssignmentOrReturnOfFieldWithMutableType
-			this.output = output;
-			return this;
-		}
-
-		public Builder numberOfPages(int numberOfPages) {
-			this.numberOfPages = numberOfPages;
-			return this;
-		}
-
-		public AllocationRequest build() {
-			return new AllocationRequest(owner, output, numberOfPages);
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  factories for testing
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index 6a553df..adccccd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.memory;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager.AllocationRequest;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 
 import org.junit.After;
@@ -33,7 +32,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 
-import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.forOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -165,7 +163,7 @@ public class MemoryManagerTest {
 
 			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
 
-			testCannotAllocateAnymore(forOf(mockInvoke, 1));
+			testCannotAllocateAnymore(mockInvoke, 1);
 
 			Assert.assertTrue("The previously allocated segments were not valid any more.",
 																	allMemorySegmentsValid(segs));
@@ -182,13 +180,13 @@ public class MemoryManagerTest {
 	public void doubleReleaseReturnsMemoryOnlyOnce() throws MemoryAllocationException {
 		final AbstractInvokable mockInvoke = new DummyInvokable();
 
-		Collection<MemorySegment> segs = this.memoryManager.allocatePages(forOf(mockInvoke, NUM_PAGES));
+		Collection<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
 		MemorySegment segment = segs.iterator().next();
 
 		this.memoryManager.release(segment);
 		this.memoryManager.release(segment);
 
-		testCannotAllocateAnymore(forOf(mockInvoke, 2));
+		testCannotAllocateAnymore(mockInvoke, 2);
 
 		this.memoryManager.releaseAll(mockInvoke);
 	}
@@ -281,13 +279,13 @@ public class MemoryManagerTest {
 
 		// allocate half memory for segments
 		Object owner1 = new Object();
-		memoryManager.allocatePages(forOf(owner1, totalPagesForType / 2));
+		memoryManager.allocatePages(owner1, totalPagesForType / 2);
 
 		// reserve the other half of memory
 		Object owner2 = new Object();
 		memoryManager.reserveMemory(owner2, (long) PAGE_SIZE * totalPagesForType / 2);
 
-		testCannotAllocateAnymore(forOf(new Object(), 1));
+		testCannotAllocateAnymore(new Object(), 1);
 		testCannotReserveAnymore(1L);
 
 		memoryManager.releaseAll(owner1);
@@ -318,9 +316,9 @@ public class MemoryManagerTest {
 		memoryManager.computeMemorySize(-0.1);
 	}
 
-	private void testCannotAllocateAnymore(AllocationRequest request) {
+	private void testCannotAllocateAnymore(Object owner, int numPages) {
 		try {
-			memoryManager.allocatePages(request);
+			memoryManager.allocatePages(owner, numPages);
 			Assert.fail("Expected MemoryAllocationException. " +
 				"We should not be able to allocate after allocating or(and) reserving all memory of a certain type.");
 		} catch (MemoryAllocationException maex) {