You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/08/28 14:48:18 UTC

[flink] 02/09: [FLINK-19024][network] Remove unused "releaseMemory" from ResultSubpartition

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b3f15284d1777925b4bd6c4358fa6dac0867172
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Aug 21 18:07:38 2020 +0200

    [FLINK-19024][network] Remove unused "releaseMemory" from ResultSubpartition
    
    The releaseMemory() call in the ResultSubpartition is currently not meaningful for any
    existing implementation.
    
    Future versions where memory may have to be released will quite possibly not implement that on a
    subpartition level. For example, a sort based shuffle has the buffers on a partition-level, rather
    than a subpartition level.
    
    We should thus remove the releaseMemory() call from the abstract subpartition interface. Concrete
    implementations can still release memory on a subpartition level, if needed in the future.
---
 .../io/network/partition/BoundedBlockingSubpartition.java     |  7 -------
 .../runtime/io/network/partition/PipelinedSubpartition.java   |  7 -------
 .../flink/runtime/io/network/partition/ResultPartition.java   | 11 +----------
 .../runtime/io/network/partition/ResultSubpartition.java      |  2 --
 .../io/network/partition/PipelinedSubpartitionTest.java       |  8 --------
 5 files changed, 1 insertion(+), 34 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index abb9bc8..767bd23 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -242,13 +242,6 @@ final class BoundedBlockingSubpartition extends ResultSubpartition {
 		}
 	}
 
-	// ------------------------------ legacy ----------------------------------
-
-	@Override
-	public int releaseMemory() throws IOException {
-		return 0;
-	}
-
 	// ---------------------------- statistics --------------------------------
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 3e294a0..b99bf02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -293,13 +293,6 @@ public class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public int releaseMemory() {
-		// The pipelined subpartition does not react to memory release requests.
-		// The buffers will be recycled by the consuming task.
-		return 0;
-	}
-
-	@Override
 	public boolean isReleased() {
 		return isReleased;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 2d8eb32..d47b816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -341,16 +341,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	 */
 	@Override
 	public void releaseMemory(int toRelease) throws IOException {
-		checkArgument(toRelease > 0);
-
-		for (ResultSubpartition subpartition : subpartitions) {
-			toRelease -= subpartition.releaseMemory();
-
-			// Only release as much memory as needed
-			if (toRelease <= 0) {
-				break;
-			}
-		}
+		// default behavior is nothing to release
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 506ae53..c2016c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -127,8 +127,6 @@ public abstract class ResultSubpartition {
 
 	public abstract ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
 
-	abstract int releaseMemory() throws IOException;
-
 	public abstract boolean isReleased();
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 30b41be..95aee00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -295,14 +295,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		verifyViewReleasedAfterParentRelease(partition);
 	}
 
-	@Test
-	public void testReleaseParentAfterSpilled() throws Exception {
-		final ResultSubpartition partition = createSubpartition();
-		partition.releaseMemory();
-
-		verifyViewReleasedAfterParentRelease(partition);
-	}
-
 	private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception {
 		// Add a bufferConsumer
 		BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);