You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/08/28 14:48:19 UTC
[flink] 03/09: [FLINK-19045][network] Remove obsolete
'taskmanager.network.partition.force-release-on-consumption' option.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3ad06e3aacda3923f3c8e40eced430177aecb49d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Aug 28 13:10:49 2020 +0200
[FLINK-19045][network] Remove obsolete 'taskmanager.network.partition.force-release-on-consumption' option.
---
.../NettyShuffleEnvironmentOptions.java | 5 -----
.../io/network/NettyShuffleServiceFactory.java | 1 -
.../network/partition/ResultPartitionFactory.java | 6 +-----
.../NettyShuffleEnvironmentConfiguration.java | 15 --------------
.../io/network/NettyShuffleEnvironmentBuilder.java | 1 -
.../network/netty/PartitionRequestQueueTest.java | 10 ++++------
...edResultPartitionReleaseOnConsumptionTest.java} | 10 +++++-----
.../network/partition/ResultPartitionBuilder.java | 8 --------
.../partition/ResultPartitionFactoryTest.java | 23 +++++++---------------
.../io/network/partition/ResultPartitionTest.java | 1 -
10 files changed, 17 insertions(+), 63 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 832cf84..584bc6e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -298,11 +298,6 @@ public class NettyShuffleEnvironmentOptions {
// ------------------------------------------------------------------------
- @Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
- public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
- key("taskmanager.network.partition.force-release-on-consumption")
- .defaultValue(false);
-
/** Not intended to be instantiated. */
private NettyShuffleEnvironmentOptions() {}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index d6feb9b..d5863d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -124,7 +124,6 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
config.networkBuffersPerChannel(),
config.floatingNetworkBuffersPerGate(),
config.networkBufferSize(),
- config.isForcePartitionReleaseOnConsumption(),
config.isBlockingShuffleCompressionEnabled(),
config.getCompressionCodec(),
config.getMaxBuffersPerChannel());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 07e39e7..f72f381 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -58,8 +58,6 @@ public class ResultPartitionFactory {
private final int networkBufferSize;
- private final boolean forcePartitionReleaseOnConsumption;
-
private final boolean blockingShuffleCompressionEnabled;
private final String compressionCodec;
@@ -74,7 +72,6 @@ public class ResultPartitionFactory {
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate,
int networkBufferSize,
- boolean forcePartitionReleaseOnConsumption,
boolean blockingShuffleCompressionEnabled,
String compressionCodec,
int maxBuffersPerChannel) {
@@ -86,7 +83,6 @@ public class ResultPartitionFactory {
this.bufferPoolFactory = bufferPoolFactory;
this.blockingSubpartitionType = blockingSubpartitionType;
this.networkBufferSize = networkBufferSize;
- this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
this.blockingShuffleCompressionEnabled = blockingShuffleCompressionEnabled;
this.compressionCodec = compressionCodec;
this.maxBuffersPerChannel = maxBuffersPerChannel;
@@ -121,7 +117,7 @@ public class ResultPartitionFactory {
}
ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
- ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()
+ ResultPartition partition = !type.isBlocking()
? new ReleaseOnConsumptionResultPartition(
taskNameWithSubtaskAndId,
partitionIndex,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index feca864..09505dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -68,8 +68,6 @@ public class NettyShuffleEnvironmentConfiguration {
private final BoundedBlockingSubpartitionType blockingSubpartitionType;
- private final boolean forcePartitionReleaseOnConsumption;
-
private final boolean blockingShuffleCompressionEnabled;
private final String compressionCodec;
@@ -88,7 +86,6 @@ public class NettyShuffleEnvironmentConfiguration {
@Nullable NettyConfig nettyConfig,
String[] tempDirs,
BoundedBlockingSubpartitionType blockingSubpartitionType,
- boolean forcePartitionReleaseOnConsumption,
boolean blockingShuffleCompressionEnabled,
String compressionCodec,
int maxBuffersPerChannel) {
@@ -104,7 +101,6 @@ public class NettyShuffleEnvironmentConfiguration {
this.nettyConfig = nettyConfig;
this.tempDirs = Preconditions.checkNotNull(tempDirs);
this.blockingSubpartitionType = Preconditions.checkNotNull(blockingSubpartitionType);
- this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
this.blockingShuffleCompressionEnabled = blockingShuffleCompressionEnabled;
this.compressionCodec = Preconditions.checkNotNull(compressionCodec);
this.maxBuffersPerChannel = maxBuffersPerChannel;
@@ -156,10 +152,6 @@ public class NettyShuffleEnvironmentConfiguration {
return blockingSubpartitionType;
}
- public boolean isForcePartitionReleaseOnConsumption() {
- return forcePartitionReleaseOnConsumption;
- }
-
public boolean isBlockingShuffleCompressionEnabled() {
return blockingShuffleCompressionEnabled;
}
@@ -218,9 +210,6 @@ public class NettyShuffleEnvironmentConfiguration {
BoundedBlockingSubpartitionType blockingSubpartitionType = getBlockingSubpartitionType(configuration);
- boolean forcePartitionReleaseOnConsumption =
- configuration.getBoolean(NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
-
boolean blockingShuffleCompressionEnabled =
configuration.get(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED);
String compressionCodec = configuration.getString(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC);
@@ -237,7 +226,6 @@ public class NettyShuffleEnvironmentConfiguration {
nettyConfig,
tempDirs,
blockingSubpartitionType,
- forcePartitionReleaseOnConsumption,
blockingShuffleCompressionEnabled,
compressionCodec,
maxBuffersPerChannel);
@@ -359,7 +347,6 @@ public class NettyShuffleEnvironmentConfiguration {
result = 31 * result + requestSegmentsTimeout.hashCode();
result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
result = 31 * result + Arrays.hashCode(tempDirs);
- result = 31 * result + (forcePartitionReleaseOnConsumption ? 1 : 0);
result = 31 * result + (blockingShuffleCompressionEnabled ? 1 : 0);
result = 31 * result + Objects.hashCode(compressionCodec);
result = 31 * result + maxBuffersPerChannel;
@@ -386,7 +373,6 @@ public class NettyShuffleEnvironmentConfiguration {
this.requestSegmentsTimeout.equals(that.requestSegmentsTimeout) &&
(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null) &&
Arrays.equals(this.tempDirs, that.tempDirs) &&
- this.forcePartitionReleaseOnConsumption == that.forcePartitionReleaseOnConsumption &&
this.blockingShuffleCompressionEnabled == that.blockingShuffleCompressionEnabled &&
this.maxBuffersPerChannel == that.maxBuffersPerChannel &&
Objects.equals(this.compressionCodec, that.compressionCodec);
@@ -405,7 +391,6 @@ public class NettyShuffleEnvironmentConfiguration {
", requestSegmentsTimeout=" + requestSegmentsTimeout +
", nettyConfig=" + nettyConfig +
", tempDirs=" + Arrays.toString(tempDirs) +
- ", forcePartitionReleaseOnConsumption=" + forcePartitionReleaseOnConsumption +
", blockingShuffleCompressionEnabled=" + blockingShuffleCompressionEnabled +
", compressionCodec=" + compressionCodec +
", maxBuffersPerChannel=" + maxBuffersPerChannel +
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index ab999fc..2551694 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -154,7 +154,6 @@ public class NettyShuffleEnvironmentBuilder {
nettyConfig,
DEFAULT_TEMP_DIRS,
BoundedBlockingSubpartitionType.AUTO,
- false,
blockingShuffleCompressionEnabled,
compressionCodec,
maxBuffersPerChannel),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index e1c72e0..8eac7bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -467,14 +467,13 @@ public class PartitionRequestQueueTest {
channel.runPendingTasks();
assertFalse(queue.getAvailableReaders().contains(reader));
- // the partition and its reader view should all be released
+
+ // the reader view should be released (the partition is not, though, blocking partitions
+ // support multiple successive readers for recovery and caching)
assertTrue(reader.isReleased());
- assertTrue(partition.isReleased());
- for (ResultSubpartition subpartition : partition.getAllPartitions()) {
- assertTrue(subpartition.isReleased());
- }
// cleanup
+ partition.release();
channel.close();
}
@@ -483,7 +482,6 @@ public class PartitionRequestQueueTest {
.setResultPartitionType(ResultPartitionType.BLOCKING)
.setFileChannelManager(fileChannelManager)
.setResultPartitionManager(partitionManager)
- .isReleasedOnConsumption(true)
.build();
partitionManager.registerResultPartition(partition);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartitionReleaseOnConsumptionTest.java
similarity index 88%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartitionReleaseOnConsumptionTest.java
index ce7467f..f14f1db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartitionReleaseOnConsumptionTest.java
@@ -25,16 +25,16 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
- * Tests for the {@link ReleaseOnConsumptionResultPartitionTest}.
+ * Tests for the {@link PipelinedResultPartitionReleaseOnConsumptionTest}.
*/
-public class ReleaseOnConsumptionResultPartitionTest extends TestLogger {
+public class PipelinedResultPartitionReleaseOnConsumptionTest extends TestLogger {
@Test
public void testConsumptionBasedPartitionRelease() {
final ResultPartitionManager manager = new ResultPartitionManager();
final ResultPartition partition = new ResultPartitionBuilder()
+ .setResultPartitionType(ResultPartitionType.PIPELINED)
.setNumberOfSubpartitions(2)
- .isReleasedOnConsumption(true)
.setResultPartitionManager(manager)
.build();
@@ -51,8 +51,8 @@ public class ReleaseOnConsumptionResultPartitionTest extends TestLogger {
public void testMultipleReleaseCallsAreIdempotent() {
final ResultPartitionManager manager = new ResultPartitionManager();
final ResultPartition partition = new ResultPartitionBuilder()
+ .setResultPartitionType(ResultPartitionType.PIPELINED)
.setNumberOfSubpartitions(2)
- .isReleasedOnConsumption(true)
.setResultPartitionManager(manager)
.build();
manager.registerResultPartition(partition);
@@ -67,8 +67,8 @@ public class ReleaseOnConsumptionResultPartitionTest extends TestLogger {
public void testReleaseAfterIdempotentCalls() {
final ResultPartitionManager manager = new ResultPartitionManager();
final ResultPartition partition = new ResultPartitionBuilder()
+ .setResultPartitionType(ResultPartitionType.PIPELINED)
.setNumberOfSubpartitions(2)
- .isReleasedOnConsumption(true)
.setResultPartitionManager(manager)
.build();
manager.registerResultPartition(partition);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 7527408..1fb4f3c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -63,8 +63,6 @@ public class ResultPartitionBuilder {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private Optional<FunctionWithException<BufferPoolOwner, BufferPool, IOException>> bufferPoolFactory = Optional.empty();
- private boolean releasedOnConsumption;
-
private boolean blockingShuffleCompressionEnabled = false;
private String compressionCodec = "LZ4";
@@ -137,11 +135,6 @@ public class ResultPartitionBuilder {
return this;
}
- public ResultPartitionBuilder isReleasedOnConsumption(boolean releasedOnConsumption) {
- this.releasedOnConsumption = releasedOnConsumption;
- return this;
- }
-
public ResultPartitionBuilder setBlockingShuffleCompressionEnabled(boolean blockingShuffleCompressionEnabled) {
this.blockingShuffleCompressionEnabled = blockingShuffleCompressionEnabled;
return this;
@@ -167,7 +160,6 @@ public class ResultPartitionBuilder {
networkBuffersPerChannel,
floatingNetworkBuffersPerGate,
networkBufferSize,
- releasedOnConsumption,
blockingShuffleCompressionEnabled,
compressionCodec,
maxBuffersPerChannel);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index ad81759..acb07cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -59,37 +59,29 @@ public class ResultPartitionFactoryTest extends TestLogger {
@Test
public void testBoundedBlockingSubpartitionsCreated() {
- final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.BLOCKING);
+ final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING);
Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(BoundedBlockingSubpartition.class)));
}
@Test
public void testPipelinedSubpartitionsCreated() {
- final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.PIPELINED);
+ final ResultPartition resultPartition = createResultPartition(ResultPartitionType.PIPELINED);
Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(PipelinedSubpartition.class)));
}
@Test
- public void testConsumptionOnReleaseForced() {
- final ResultPartition resultPartition = createResultPartition(true, ResultPartitionType.BLOCKING);
+ public void testConsumptionOnReleaseForPipelined() {
+ final ResultPartition resultPartition = createResultPartition(ResultPartitionType.PIPELINED);
assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
}
@Test
- public void testConsumptionOnReleaseEnabledForNonBlocking() {
- final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.PIPELINED);
- assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
- }
-
- @Test
- public void testConsumptionOnReleaseDisabled() {
- final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.BLOCKING);
+ public void testNoConsumptionOnReleaseForBlocking() {
+ final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING);
assertThat(resultPartition, not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
}
- private static ResultPartition createResultPartition(
- boolean releasePartitionOnConsumption,
- ResultPartitionType partitionType) {
+ private static ResultPartition createResultPartition(ResultPartitionType partitionType) {
ResultPartitionFactory factory = new ResultPartitionFactory(
new ResultPartitionManager(),
fileChannelManager,
@@ -98,7 +90,6 @@ public class ResultPartitionFactoryTest extends TestLogger {
1,
1,
SEGMENT_SIZE,
- releasePartitionOnConsumption,
false,
"LZ4",
Integer.MAX_VALUE);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 6d21ce1..23e61c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -159,7 +159,6 @@ public class ResultPartitionTest {
ResultPartitionManager manager = new ResultPartitionManager();
final ResultPartition partition = new ResultPartitionBuilder()
- .isReleasedOnConsumption(false)
.setResultPartitionManager(manager)
.setResultPartitionType(ResultPartitionType.BLOCKING)
.setFileChannelManager(fileChannelManager)