You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/02 07:44:00 UTC
[flink] 03/15: [hotfix][network] fix codestyle issues in
ResultPartitionFactory
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a7869bcf3578cacc188b09c6d1f0193c11eb39c8
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:27:50 2019 +0300
[hotfix][network] fix codestyle issues in ResultPartitionFactory
---
.../network/partition/ResultPartitionFactory.java | 49 ++++++++++------------
1 file changed, 21 insertions(+), 28 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index b390987..0656e6e 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -33,8 +33,6 @@ import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
-
import java.io.File;
import java.io.IOException;
import java.util.Optional;
@@ -46,13 +44,10 @@ public class ResultPartitionFactory {
private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class);
- @Nonnull
private final ResultPartitionManager partitionManager;
- @Nonnull
private final FileChannelManager channelManager;
- @Nonnull
private final BufferPoolFactory bufferPoolFactory;
private final BoundedBlockingSubpartitionType blockingSubpartitionType;
@@ -66,9 +61,9 @@ public class ResultPartitionFactory {
private final boolean forcePartitionReleaseOnConsumption;
public ResultPartitionFactory(
- @Nonnull ResultPartitionManager partitionManager,
- @Nonnull FileChannelManager channelManager,
- @Nonnull BufferPoolFactory bufferPoolFactory,
+ ResultPartitionManager partitionManager,
+ FileChannelManager channelManager,
+ BufferPoolFactory bufferPoolFactory,
BoundedBlockingSubpartitionType blockingSubpartitionType,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate,
@@ -86,9 +81,8 @@ public class ResultPartitionFactory {
}
public ResultPartition create(
- @Nonnull String taskNameWithSubtaskAndId,
- @Nonnull ResultPartitionDeploymentDescriptor desc) {
-
+ String taskNameWithSubtaskAndId,
+ ResultPartitionDeploymentDescriptor desc) {
return create(
taskNameWithSubtaskAndId,
desc.getShuffleDescriptor().getResultPartitionID(),
@@ -100,13 +94,12 @@ public class ResultPartitionFactory {
@VisibleForTesting
public ResultPartition create(
- @Nonnull String taskNameWithSubtaskAndId,
- @Nonnull ResultPartitionID id,
- @Nonnull ResultPartitionType type,
- int numberOfSubpartitions,
- int maxParallelism,
- FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
-
+ String taskNameWithSubtaskAndId,
+ ResultPartitionID id,
+ ResultPartitionType type,
+ int numberOfSubpartitions,
+ int maxParallelism,
+ FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()
@@ -139,10 +132,10 @@ public class ResultPartitionFactory {
ResultPartitionType type,
BoundedBlockingSubpartitionType blockingSubpartitionType,
ResultSubpartition[] subpartitions) {
-
// Create the subpartitions.
switch (type) {
case BLOCKING:
+ case BLOCKING_PERSISTENT:
initializeBoundedBlockingPartitions(subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager);
break;
@@ -160,15 +153,14 @@ public class ResultPartitionFactory {
}
private static void initializeBoundedBlockingPartitions(
- ResultSubpartition[] subpartitions,
- ResultPartition parent,
- BoundedBlockingSubpartitionType blockingSubpartitionType,
- int networkBufferSize,
- FileChannelManager channelManager) {
-
+ ResultSubpartition[] subpartitions,
+ ResultPartition parent,
+ BoundedBlockingSubpartitionType blockingSubpartitionType,
+ int networkBufferSize,
+ FileChannelManager channelManager) {
int i = 0;
try {
- for (; i < subpartitions.length; i++) {
+ for (i = 0; i < subpartitions.length; i++) {
final File spillFile = channelManager.createChannel().getPathFile();
subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize);
}
@@ -194,8 +186,8 @@ public class ResultPartitionFactory {
@VisibleForTesting
FunctionWithException<BufferPoolOwner, BufferPool, IOException> createBufferPoolFactory(
- int numberOfSubpartitions, ResultPartitionType type) {
-
+ int numberOfSubpartitions,
+ ResultPartitionType type) {
return p -> {
int maxNumberOfMemorySegments = type.isBounded() ?
numberOfSubpartitions * networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
@@ -213,6 +205,7 @@ public class ResultPartitionFactory {
return BoundedBlockingSubpartitionType.FILE_MMAP;
case _32_BIT:
return BoundedBlockingSubpartitionType.FILE;
+ case UNKNOWN:
default:
LOG.warn("Cannot determine memory architecture. Using pure file-based shuffle.");
return BoundedBlockingSubpartitionType.FILE;