You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:19 UTC

[flink] 10/16: [hotfix][network] Split InputProcessorUtil into smaller methods

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e5c499d6263b59183dc93d56b2bb27684eeecb1
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jun 18 16:57:19 2019 +0200

    [hotfix][network] Split InputProcessorUtil into smaller methods
---
 .../streaming/runtime/io/InputProcessorUtil.java   | 79 ++++++++++++++--------
 1 file changed, 49 insertions(+), 30 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 7eda06c..419cf16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import java.io.IOException;
 
@@ -37,44 +37,63 @@ import java.io.IOException;
 public class InputProcessorUtil {
 
 	public static CheckpointedInputGate createCheckpointedInputGate(
-			StreamTask<?, ?> checkpointedTask,
+			AbstractInvokable toNotifyOnCheckpoint,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
 			InputGate inputGate,
 			Configuration taskManagerConfig,
 			String taskName) throws IOException {
 
-		CheckpointedInputGate checkpointedInputGate;
-		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-			long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
-			if (!(maxAlign == -1 || maxAlign > 0)) {
-				throw new IllegalConfigurationException(
-					TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-					+ " must be positive or -1 (infinite)");
-			}
+		BufferStorage bufferStorage = createBufferStorage(
+			checkpointMode, ioManager, inputGate.getPageSize(), taskManagerConfig, taskName);
+		CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
+			checkpointMode, inputGate.getNumberOfInputChannels(), taskName, toNotifyOnCheckpoint);
+		return new CheckpointedInputGate(inputGate, bufferStorage, barrierHandler);
+	}
 
-			if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
-				checkpointedInputGate = new CheckpointedInputGate(
-					inputGate,
-					new CachedBufferStorage(inputGate.getPageSize(), maxAlign, taskName),
-					taskName,
-					checkpointedTask);
-			} else {
-				checkpointedInputGate = new CheckpointedInputGate(
-					inputGate,
-					new BufferSpiller(ioManager, inputGate.getPageSize(), maxAlign, taskName),
+	private static CheckpointBarrierHandler createCheckpointBarrierHandler(
+			CheckpointingMode checkpointMode,
+			int numberOfInputChannels,
+			String taskName,
+			AbstractInvokable toNotifyOnCheckpoint) {
+		switch (checkpointMode) {
+			case EXACTLY_ONCE:
+				return new CheckpointBarrierAligner(
+					numberOfInputChannels,
 					taskName,
-					checkpointedTask);
-			}
-		} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-			checkpointedInputGate = new CheckpointedInputGate(
-				inputGate,
-				new EmptyBufferStorage(),
-				new CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), checkpointedTask));
-		} else {
-			throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
+					toNotifyOnCheckpoint);
+			case AT_LEAST_ONCE:
+				return new CheckpointBarrierTracker(numberOfInputChannels, toNotifyOnCheckpoint);
+			default:
+				throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode);
 		}
+	}
 
-		return checkpointedInputGate;
+	private static BufferStorage createBufferStorage(
+			CheckpointingMode checkpointMode,
+			IOManager ioManager,
+			int pageSize,
+			Configuration taskManagerConfig,
+			String taskName) throws IOException {
+		switch (checkpointMode) {
+			case EXACTLY_ONCE: {
+				long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+				if (!(maxAlign == -1 || maxAlign > 0)) {
+					throw new IllegalConfigurationException(
+						TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
+							+ " must be positive or -1 (infinite)");
+				}
+
+				if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
+					return new CachedBufferStorage(pageSize, maxAlign, taskName);
+				} else {
+					return new BufferSpiller(ioManager, pageSize, maxAlign, taskName);
+				}
+			}
+			case AT_LEAST_ONCE:
+				return new EmptyBufferStorage();
+			default:
+				throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode);
+		}
 	}
 }