You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/02 07:44:00 UTC

[flink] 03/15: [hotfix][network] fix codestyle issues in ResultPartitionFactory

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a7869bcf3578cacc188b09c6d1f0193c11eb39c8
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 29 17:27:50 2019 +0300

    [hotfix][network] fix codestyle issues in ResultPartitionFactory
---
 .../network/partition/ResultPartitionFactory.java  | 49 ++++++++++------------
 1 file changed, 21 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index b390987..0656e6e 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -33,8 +33,6 @@ import org.apache.flink.util.function.FunctionWithException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Optional;
@@ -46,13 +44,10 @@ public class ResultPartitionFactory {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class);
 
-	@Nonnull
 	private final ResultPartitionManager partitionManager;
 
-	@Nonnull
 	private final FileChannelManager channelManager;
 
-	@Nonnull
 	private final BufferPoolFactory bufferPoolFactory;
 
 	private final BoundedBlockingSubpartitionType blockingSubpartitionType;
@@ -66,9 +61,9 @@ public class ResultPartitionFactory {
 	private final boolean forcePartitionReleaseOnConsumption;
 
 	public ResultPartitionFactory(
-		@Nonnull ResultPartitionManager partitionManager,
-		@Nonnull FileChannelManager channelManager,
-		@Nonnull BufferPoolFactory bufferPoolFactory,
+		ResultPartitionManager partitionManager,
+		FileChannelManager channelManager,
+		BufferPoolFactory bufferPoolFactory,
 		BoundedBlockingSubpartitionType blockingSubpartitionType,
 		int networkBuffersPerChannel,
 		int floatingNetworkBuffersPerGate,
@@ -86,9 +81,8 @@ public class ResultPartitionFactory {
 	}
 
 	public ResultPartition create(
-		@Nonnull String taskNameWithSubtaskAndId,
-		@Nonnull ResultPartitionDeploymentDescriptor desc) {
-
+			String taskNameWithSubtaskAndId,
+			ResultPartitionDeploymentDescriptor desc) {
 		return create(
 			taskNameWithSubtaskAndId,
 			desc.getShuffleDescriptor().getResultPartitionID(),
@@ -100,13 +94,12 @@ public class ResultPartitionFactory {
 
 	@VisibleForTesting
 	public ResultPartition create(
-		@Nonnull String taskNameWithSubtaskAndId,
-		@Nonnull ResultPartitionID id,
-		@Nonnull ResultPartitionType type,
-		int numberOfSubpartitions,
-		int maxParallelism,
-		FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
-
+			String taskNameWithSubtaskAndId,
+			ResultPartitionID id,
+			ResultPartitionType type,
+			int numberOfSubpartitions,
+			int maxParallelism,
+			FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
 		ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
 
 		ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()
@@ -139,10 +132,10 @@ public class ResultPartitionFactory {
 			ResultPartitionType type,
 			BoundedBlockingSubpartitionType blockingSubpartitionType,
 			ResultSubpartition[] subpartitions) {
-
 		// Create the subpartitions.
 		switch (type) {
 			case BLOCKING:
+			case BLOCKING_PERSISTENT:
 				initializeBoundedBlockingPartitions(subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager);
 				break;
 
@@ -160,15 +153,14 @@ public class ResultPartitionFactory {
 	}
 
 	private static void initializeBoundedBlockingPartitions(
-		ResultSubpartition[] subpartitions,
-		ResultPartition parent,
-		BoundedBlockingSubpartitionType blockingSubpartitionType,
-		int networkBufferSize,
-		FileChannelManager channelManager) {
-
+			ResultSubpartition[] subpartitions,
+			ResultPartition parent,
+			BoundedBlockingSubpartitionType blockingSubpartitionType,
+			int networkBufferSize,
+			FileChannelManager channelManager) {
 		int i = 0;
 		try {
-			for (; i < subpartitions.length; i++) {
+			for (i = 0; i < subpartitions.length; i++) {
 				final File spillFile = channelManager.createChannel().getPathFile();
 				subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize);
 			}
@@ -194,8 +186,8 @@ public class ResultPartitionFactory {
 
 	@VisibleForTesting
 	FunctionWithException<BufferPoolOwner, BufferPool, IOException> createBufferPoolFactory(
-		int numberOfSubpartitions, ResultPartitionType type) {
-
+			int numberOfSubpartitions,
+			ResultPartitionType type) {
 		return p -> {
 			int maxNumberOfMemorySegments = type.isBounded() ?
 				numberOfSubpartitions * networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
@@ -213,6 +205,7 @@ public class ResultPartitionFactory {
 				return BoundedBlockingSubpartitionType.FILE_MMAP;
 			case _32_BIT:
 				return BoundedBlockingSubpartitionType.FILE;
+			case UNKNOWN:
 			default:
 				LOG.warn("Cannot determine memory architecture. Using pure file-based shuffle.");
 				return BoundedBlockingSubpartitionType.FILE;