You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/04 08:25:48 UTC

[flink] 01/04: [hotfix][runtime] Make BufferStorage independent with InputGate#getPageSize

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18990694eb1fc8c18b66a31960bee6f4f163c43b
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Tue Jul 2 10:46:09 2019 +0800

    [hotfix][runtime] Make BufferStorage independent with InputGate#getPageSize
    
    The page size could be got directly from configuration during constructing BufferStorage instance instead of from InputGate,
    then we could remove abstract getPageSize method from InputGate later.
---
 .../apache/flink/streaming/runtime/io/InputProcessorUtil.java | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 800c33e..4f2a07c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
 import java.io.IOException;
@@ -46,8 +47,10 @@ public class InputProcessorUtil {
 			Configuration taskManagerConfig,
 			String taskName) throws IOException {
 
+		int pageSize = ConfigurationParserUtils.getPageSize(taskManagerConfig);
+
 		BufferStorage bufferStorage = createBufferStorage(
-			checkpointMode, ioManager, inputGate.getPageSize(), taskManagerConfig, taskName);
+			checkpointMode, ioManager, pageSize, taskManagerConfig, taskName);
 		CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
 			checkpointMode, inputGate.getNumberOfInputChannels(), taskName, toNotifyOnCheckpoint);
 		return new CheckpointedInputGate(inputGate, bufferStorage, barrierHandler);
@@ -66,10 +69,12 @@ public class InputProcessorUtil {
 			Configuration taskManagerConfig,
 			String taskName) throws IOException {
 
+		int pageSize = ConfigurationParserUtils.getPageSize(taskManagerConfig);
+
 		BufferStorage mainBufferStorage1 = createBufferStorage(
-			checkpointMode, ioManager, inputGate1.getPageSize(), taskManagerConfig, taskName);
+			checkpointMode, ioManager, pageSize, taskManagerConfig, taskName);
 		BufferStorage mainBufferStorage2 = createBufferStorage(
-			checkpointMode, ioManager, inputGate2.getPageSize(), taskManagerConfig, taskName);
+			checkpointMode, ioManager, pageSize, taskManagerConfig, taskName);
 		checkState(mainBufferStorage1.getMaxBufferedBytes() == mainBufferStorage2.getMaxBufferedBytes());
 
 		BufferStorage linkedBufferStorage1 = new LinkedBufferStorage(