You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:19 UTC
[flink] 10/16: [hotfix][network] Split InputProcessorUtil into
smaller methods
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2e5c499d6263b59183dc93d56b2bb27684eeecb1
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jun 18 16:57:19 2019 +0200
[hotfix][network] Split InputProcessorUtil into smaller methods
---
.../streaming/runtime/io/InputProcessorUtil.java | 79 ++++++++++++++--------
1 file changed, 49 insertions(+), 30 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 7eda06c..419cf16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
import java.io.IOException;
@@ -37,44 +37,63 @@ import java.io.IOException;
public class InputProcessorUtil {
public static CheckpointedInputGate createCheckpointedInputGate(
- StreamTask<?, ?> checkpointedTask,
+ AbstractInvokable toNotifyOnCheckpoint,
CheckpointingMode checkpointMode,
IOManager ioManager,
InputGate inputGate,
Configuration taskManagerConfig,
String taskName) throws IOException {
- CheckpointedInputGate checkpointedInputGate;
- if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
- long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
- if (!(maxAlign == -1 || maxAlign > 0)) {
- throw new IllegalConfigurationException(
- TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
- + " must be positive or -1 (infinite)");
- }
+ BufferStorage bufferStorage = createBufferStorage(
+ checkpointMode, ioManager, inputGate.getPageSize(), taskManagerConfig, taskName);
+ CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
+ checkpointMode, inputGate.getNumberOfInputChannels(), taskName, toNotifyOnCheckpoint);
+ return new CheckpointedInputGate(inputGate, bufferStorage, barrierHandler);
+ }
- if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
- checkpointedInputGate = new CheckpointedInputGate(
- inputGate,
- new CachedBufferStorage(inputGate.getPageSize(), maxAlign, taskName),
- taskName,
- checkpointedTask);
- } else {
- checkpointedInputGate = new CheckpointedInputGate(
- inputGate,
- new BufferSpiller(ioManager, inputGate.getPageSize(), maxAlign, taskName),
+ private static CheckpointBarrierHandler createCheckpointBarrierHandler(
+ CheckpointingMode checkpointMode,
+ int numberOfInputChannels,
+ String taskName,
+ AbstractInvokable toNotifyOnCheckpoint) {
+ switch (checkpointMode) {
+ case EXACTLY_ONCE:
+ return new CheckpointBarrierAligner(
+ numberOfInputChannels,
taskName,
- checkpointedTask);
- }
- } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
- checkpointedInputGate = new CheckpointedInputGate(
- inputGate,
- new EmptyBufferStorage(),
- new CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), checkpointedTask));
- } else {
- throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
+ toNotifyOnCheckpoint);
+ case AT_LEAST_ONCE:
+ return new CheckpointBarrierTracker(numberOfInputChannels, toNotifyOnCheckpoint);
+ default:
+ throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode);
}
+ }
- return checkpointedInputGate;
+ private static BufferStorage createBufferStorage(
+ CheckpointingMode checkpointMode,
+ IOManager ioManager,
+ int pageSize,
+ Configuration taskManagerConfig,
+ String taskName) throws IOException {
+ switch (checkpointMode) {
+ case EXACTLY_ONCE: {
+ long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+ if (!(maxAlign == -1 || maxAlign > 0)) {
+ throw new IllegalConfigurationException(
+ TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
+ + " must be positive or -1 (infinite)");
+ }
+
+ if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
+ return new CachedBufferStorage(pageSize, maxAlign, taskName);
+ } else {
+ return new BufferSpiller(ioManager, pageSize, maxAlign, taskName);
+ }
+ }
+ case AT_LEAST_ONCE:
+ return new EmptyBufferStorage();
+ default:
+ throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode);
+ }
}
}