You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/08/28 14:48:19 UTC

[flink] 03/09: [FLINK-19045][network] Remove obsolete 'taskmanager.network.partition.force-release-on-consumption' option.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3ad06e3aacda3923f3c8e40eced430177aecb49d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Aug 28 13:10:49 2020 +0200

    [FLINK-19045][network] Remove obsolete 'taskmanager.network.partition.force-release-on-consumption' option.
---
 .../NettyShuffleEnvironmentOptions.java            |  5 -----
 .../io/network/NettyShuffleServiceFactory.java     |  1 -
 .../network/partition/ResultPartitionFactory.java  |  6 +-----
 .../NettyShuffleEnvironmentConfiguration.java      | 15 --------------
 .../io/network/NettyShuffleEnvironmentBuilder.java |  1 -
 .../network/netty/PartitionRequestQueueTest.java   | 10 ++++------
 ...edResultPartitionReleaseOnConsumptionTest.java} | 10 +++++-----
 .../network/partition/ResultPartitionBuilder.java  |  8 --------
 .../partition/ResultPartitionFactoryTest.java      | 23 +++++++---------------
 .../io/network/partition/ResultPartitionTest.java  |  1 -
 10 files changed, 17 insertions(+), 63 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 832cf84..584bc6e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -298,11 +298,6 @@ public class NettyShuffleEnvironmentOptions {
 
 	// ------------------------------------------------------------------------
 
-	@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
-	public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
-		key("taskmanager.network.partition.force-release-on-consumption")
-			.defaultValue(false);
-
 	/** Not intended to be instantiated. */
 	private NettyShuffleEnvironmentOptions() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index d6feb9b..d5863d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -124,7 +124,6 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			config.networkBuffersPerChannel(),
 			config.floatingNetworkBuffersPerGate(),
 			config.networkBufferSize(),
-			config.isForcePartitionReleaseOnConsumption(),
 			config.isBlockingShuffleCompressionEnabled(),
 			config.getCompressionCodec(),
 			config.getMaxBuffersPerChannel());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 07e39e7..f72f381 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -58,8 +58,6 @@ public class ResultPartitionFactory {
 
 	private final int networkBufferSize;
 
-	private final boolean forcePartitionReleaseOnConsumption;
-
 	private final boolean blockingShuffleCompressionEnabled;
 
 	private final String compressionCodec;
@@ -74,7 +72,6 @@ public class ResultPartitionFactory {
 		int networkBuffersPerChannel,
 		int floatingNetworkBuffersPerGate,
 		int networkBufferSize,
-		boolean forcePartitionReleaseOnConsumption,
 		boolean blockingShuffleCompressionEnabled,
 		String compressionCodec,
 		int maxBuffersPerChannel) {
@@ -86,7 +83,6 @@ public class ResultPartitionFactory {
 		this.bufferPoolFactory = bufferPoolFactory;
 		this.blockingSubpartitionType = blockingSubpartitionType;
 		this.networkBufferSize = networkBufferSize;
-		this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
 		this.blockingShuffleCompressionEnabled = blockingShuffleCompressionEnabled;
 		this.compressionCodec = compressionCodec;
 		this.maxBuffersPerChannel = maxBuffersPerChannel;
@@ -121,7 +117,7 @@ public class ResultPartitionFactory {
 		}
 
 		ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
-		ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()
+		ResultPartition partition = !type.isBlocking()
 			? new ReleaseOnConsumptionResultPartition(
 				taskNameWithSubtaskAndId,
 				partitionIndex,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index feca864..09505dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -68,8 +68,6 @@ public class NettyShuffleEnvironmentConfiguration {
 
 	private final BoundedBlockingSubpartitionType blockingSubpartitionType;
 
-	private final boolean forcePartitionReleaseOnConsumption;
-
 	private final boolean blockingShuffleCompressionEnabled;
 
 	private final String compressionCodec;
@@ -88,7 +86,6 @@ public class NettyShuffleEnvironmentConfiguration {
 			@Nullable NettyConfig nettyConfig,
 			String[] tempDirs,
 			BoundedBlockingSubpartitionType blockingSubpartitionType,
-			boolean forcePartitionReleaseOnConsumption,
 			boolean blockingShuffleCompressionEnabled,
 			String compressionCodec,
 			int maxBuffersPerChannel) {
@@ -104,7 +101,6 @@ public class NettyShuffleEnvironmentConfiguration {
 		this.nettyConfig = nettyConfig;
 		this.tempDirs = Preconditions.checkNotNull(tempDirs);
 		this.blockingSubpartitionType = Preconditions.checkNotNull(blockingSubpartitionType);
-		this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
 		this.blockingShuffleCompressionEnabled = blockingShuffleCompressionEnabled;
 		this.compressionCodec = Preconditions.checkNotNull(compressionCodec);
 		this.maxBuffersPerChannel = maxBuffersPerChannel;
@@ -156,10 +152,6 @@ public class NettyShuffleEnvironmentConfiguration {
 		return blockingSubpartitionType;
 	}
 
-	public boolean isForcePartitionReleaseOnConsumption() {
-		return forcePartitionReleaseOnConsumption;
-	}
-
 	public boolean isBlockingShuffleCompressionEnabled() {
 		return blockingShuffleCompressionEnabled;
 	}
@@ -218,9 +210,6 @@ public class NettyShuffleEnvironmentConfiguration {
 
 		BoundedBlockingSubpartitionType blockingSubpartitionType = getBlockingSubpartitionType(configuration);
 
-		boolean forcePartitionReleaseOnConsumption =
-			configuration.getBoolean(NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
-
 		boolean blockingShuffleCompressionEnabled =
 			configuration.get(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED);
 		String compressionCodec = configuration.getString(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC);
@@ -237,7 +226,6 @@ public class NettyShuffleEnvironmentConfiguration {
 			nettyConfig,
 			tempDirs,
 			blockingSubpartitionType,
-			forcePartitionReleaseOnConsumption,
 			blockingShuffleCompressionEnabled,
 			compressionCodec,
 			maxBuffersPerChannel);
@@ -359,7 +347,6 @@ public class NettyShuffleEnvironmentConfiguration {
 		result = 31 * result + requestSegmentsTimeout.hashCode();
 		result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
 		result = 31 * result + Arrays.hashCode(tempDirs);
-		result = 31 * result + (forcePartitionReleaseOnConsumption ? 1 : 0);
 		result = 31 * result + (blockingShuffleCompressionEnabled ? 1 : 0);
 		result = 31 * result + Objects.hashCode(compressionCodec);
 		result = 31 * result + maxBuffersPerChannel;
@@ -386,7 +373,6 @@ public class NettyShuffleEnvironmentConfiguration {
 					this.requestSegmentsTimeout.equals(that.requestSegmentsTimeout) &&
 					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null) &&
 					Arrays.equals(this.tempDirs, that.tempDirs) &&
-					this.forcePartitionReleaseOnConsumption == that.forcePartitionReleaseOnConsumption &&
 					this.blockingShuffleCompressionEnabled == that.blockingShuffleCompressionEnabled &&
 					this.maxBuffersPerChannel == that.maxBuffersPerChannel &&
 					Objects.equals(this.compressionCodec, that.compressionCodec);
@@ -405,7 +391,6 @@ public class NettyShuffleEnvironmentConfiguration {
 				", requestSegmentsTimeout=" + requestSegmentsTimeout +
 				", nettyConfig=" + nettyConfig +
 				", tempDirs=" + Arrays.toString(tempDirs) +
-				", forcePartitionReleaseOnConsumption=" + forcePartitionReleaseOnConsumption +
 				", blockingShuffleCompressionEnabled=" + blockingShuffleCompressionEnabled +
 				", compressionCodec=" + compressionCodec +
 				", maxBuffersPerChannel=" + maxBuffersPerChannel +
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index ab999fc..2551694 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -154,7 +154,6 @@ public class NettyShuffleEnvironmentBuilder {
 				nettyConfig,
 				DEFAULT_TEMP_DIRS,
 				BoundedBlockingSubpartitionType.AUTO,
-				false,
 				blockingShuffleCompressionEnabled,
 				compressionCodec,
 				maxBuffersPerChannel),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index e1c72e0..8eac7bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -467,14 +467,13 @@ public class PartitionRequestQueueTest {
 		channel.runPendingTasks();
 
 		assertFalse(queue.getAvailableReaders().contains(reader));
-		// the partition and its reader view should all be released
+
+		// the reader view should be released (the partition is not, though, blocking partitions
+		// support multiple successive readers for recovery and caching)
 		assertTrue(reader.isReleased());
-		assertTrue(partition.isReleased());
-		for (ResultSubpartition subpartition : partition.getAllPartitions()) {
-			assertTrue(subpartition.isReleased());
-		}
 
 		// cleanup
+		partition.release();
 		channel.close();
 	}
 
@@ -483,7 +482,6 @@ public class PartitionRequestQueueTest {
 			.setResultPartitionType(ResultPartitionType.BLOCKING)
 			.setFileChannelManager(fileChannelManager)
 			.setResultPartitionManager(partitionManager)
-			.isReleasedOnConsumption(true)
 			.build();
 
 		partitionManager.registerResultPartition(partition);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartitionReleaseOnConsumptionTest.java
similarity index 88%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartitionReleaseOnConsumptionTest.java
index ce7467f..f14f1db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartitionReleaseOnConsumptionTest.java
@@ -25,16 +25,16 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Tests for the {@link ReleaseOnConsumptionResultPartitionTest}.
+ * Tests for the {@link PipelinedResultPartitionReleaseOnConsumptionTest}.
  */
-public class ReleaseOnConsumptionResultPartitionTest extends TestLogger {
+public class PipelinedResultPartitionReleaseOnConsumptionTest extends TestLogger {
 
 	@Test
 	public void testConsumptionBasedPartitionRelease() {
 		final ResultPartitionManager manager = new ResultPartitionManager();
 		final ResultPartition partition = new ResultPartitionBuilder()
+			.setResultPartitionType(ResultPartitionType.PIPELINED)
 			.setNumberOfSubpartitions(2)
-			.isReleasedOnConsumption(true)
 			.setResultPartitionManager(manager)
 			.build();
 
@@ -51,8 +51,8 @@ public class ReleaseOnConsumptionResultPartitionTest extends TestLogger {
 	public void testMultipleReleaseCallsAreIdempotent() {
 		final ResultPartitionManager manager = new ResultPartitionManager();
 		final ResultPartition partition = new ResultPartitionBuilder()
+			.setResultPartitionType(ResultPartitionType.PIPELINED)
 			.setNumberOfSubpartitions(2)
-			.isReleasedOnConsumption(true)
 			.setResultPartitionManager(manager)
 			.build();
 		manager.registerResultPartition(partition);
@@ -67,8 +67,8 @@ public class ReleaseOnConsumptionResultPartitionTest extends TestLogger {
 	public void testReleaseAfterIdempotentCalls() {
 		final ResultPartitionManager manager = new ResultPartitionManager();
 		final ResultPartition partition = new ResultPartitionBuilder()
+			.setResultPartitionType(ResultPartitionType.PIPELINED)
 			.setNumberOfSubpartitions(2)
-			.isReleasedOnConsumption(true)
 			.setResultPartitionManager(manager)
 			.build();
 		manager.registerResultPartition(partition);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 7527408..1fb4f3c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -63,8 +63,6 @@ public class ResultPartitionBuilder {
 	@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
 	private Optional<FunctionWithException<BufferPoolOwner, BufferPool, IOException>> bufferPoolFactory = Optional.empty();
 
-	private boolean releasedOnConsumption;
-
 	private boolean blockingShuffleCompressionEnabled = false;
 
 	private String compressionCodec = "LZ4";
@@ -137,11 +135,6 @@ public class ResultPartitionBuilder {
 		return this;
 	}
 
-	public ResultPartitionBuilder isReleasedOnConsumption(boolean releasedOnConsumption) {
-		this.releasedOnConsumption = releasedOnConsumption;
-		return this;
-	}
-
 	public ResultPartitionBuilder setBlockingShuffleCompressionEnabled(boolean blockingShuffleCompressionEnabled) {
 		this.blockingShuffleCompressionEnabled = blockingShuffleCompressionEnabled;
 		return this;
@@ -167,7 +160,6 @@ public class ResultPartitionBuilder {
 			networkBuffersPerChannel,
 			floatingNetworkBuffersPerGate,
 			networkBufferSize,
-			releasedOnConsumption,
 			blockingShuffleCompressionEnabled,
 			compressionCodec,
 			maxBuffersPerChannel);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index ad81759..acb07cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -59,37 +59,29 @@ public class ResultPartitionFactoryTest extends TestLogger {
 
 	@Test
 	public void testBoundedBlockingSubpartitionsCreated() {
-		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.BLOCKING);
+		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING);
 		Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(BoundedBlockingSubpartition.class)));
 	}
 
 	@Test
 	public void testPipelinedSubpartitionsCreated() {
-		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.PIPELINED);
+		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.PIPELINED);
 		Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(PipelinedSubpartition.class)));
 	}
 
 	@Test
-	public void testConsumptionOnReleaseForced() {
-		final ResultPartition resultPartition = createResultPartition(true, ResultPartitionType.BLOCKING);
+	public void testConsumptionOnReleaseForPipelined() {
+		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.PIPELINED);
 		assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
 	}
 
 	@Test
-	public void testConsumptionOnReleaseEnabledForNonBlocking() {
-		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.PIPELINED);
-		assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
-	}
-
-	@Test
-	public void testConsumptionOnReleaseDisabled() {
-		final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.BLOCKING);
+	public void testNoConsumptionOnReleaseForBlocking() {
+		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING);
 		assertThat(resultPartition, not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
 	}
 
-	private static ResultPartition createResultPartition(
-			boolean releasePartitionOnConsumption,
-			ResultPartitionType partitionType) {
+	private static ResultPartition createResultPartition(ResultPartitionType partitionType) {
 		ResultPartitionFactory factory = new ResultPartitionFactory(
 			new ResultPartitionManager(),
 			fileChannelManager,
@@ -98,7 +90,6 @@ public class ResultPartitionFactoryTest extends TestLogger {
 			1,
 			1,
 			SEGMENT_SIZE,
-			releasePartitionOnConsumption,
 			false,
 			"LZ4",
 			Integer.MAX_VALUE);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 6d21ce1..23e61c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -159,7 +159,6 @@ public class ResultPartitionTest {
 		ResultPartitionManager manager = new ResultPartitionManager();
 
 		final ResultPartition partition = new ResultPartitionBuilder()
-			.isReleasedOnConsumption(false)
 			.setResultPartitionManager(manager)
 			.setResultPartitionType(ResultPartitionType.BLOCKING)
 			.setFileChannelManager(fileChannelManager)