You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/04 08:25:47 UTC

[flink] branch master updated (a39ade1 -> 666aeaf)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a39ade1  [FLINK-12963] [state-processor] Make OperatorTransformation non-instantiable
     new 1899069  [hotfix][runtime] Make BufferStorage independent with InputGate#getPageSize
     new b6563e3  [FLINK-12738][network] Remove abstract getPageSize method from InputGate
     new 4004835  [hotfix][network] Remove getMemorySegmentSize method from BufferProvider interface
     new 666aeaf  [hotfix][network] Remove unnecessary getMemorySegmentSize method from NetworkBufferPool

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/io/network/buffer/BufferProvider.java  |  7 --
 .../runtime/io/network/buffer/LocalBufferPool.java |  5 --
 .../io/network/buffer/NetworkBufferPool.java       |  4 --
 .../io/network/partition/consumer/InputGate.java   |  2 -
 .../partition/consumer/SingleInputGate.java        | 10 ---
 .../network/partition/consumer/UnionInputGate.java | 13 ----
 .../runtime/taskmanager/InputGateWithMetrics.java  |  5 --
 .../io/network/buffer/NetworkBufferPoolTest.java   |  1 -
 .../runtime/io/network/util/TestBufferFactory.java |  4 --
 .../io/network/util/TestPooledBufferProvider.java  |  5 --
 .../streaming/runtime/io/InputProcessorUtil.java   | 11 ++-
 .../consumer/StreamTestSingleInputGate.java        | 82 ----------------------
 ...CheckpointBarrierAlignerAlignmentLimitTest.java |  8 +--
 .../CheckpointBarrierAlignerMassiveRandomTest.java |  7 +-
 .../io/CheckpointBarrierAlignerTestBase.java       |  2 +-
 .../runtime/io/CheckpointBarrierTrackerTest.java   |  2 +-
 .../flink/streaming/runtime/io/MockInputGate.java  | 10 +--
 17 files changed, 16 insertions(+), 162 deletions(-)


[flink] 01/04: [hotfix][runtime] Make BufferStorage independent with InputGate#getPageSize

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18990694eb1fc8c18b66a31960bee6f4f163c43b
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Tue Jul 2 10:46:09 2019 +0800

    [hotfix][runtime] Make BufferStorage independent with InputGate#getPageSize
    
    The page size could be got directly from configuration during constructing BufferStorage instance instead of from InputGate,
    then we could remove abstract getPageSize method from InputGate later.
---
 .../apache/flink/streaming/runtime/io/InputProcessorUtil.java | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 800c33e..4f2a07c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
 import java.io.IOException;
@@ -46,8 +47,10 @@ public class InputProcessorUtil {
 			Configuration taskManagerConfig,
 			String taskName) throws IOException {
 
+		int pageSize = ConfigurationParserUtils.getPageSize(taskManagerConfig);
+
 		BufferStorage bufferStorage = createBufferStorage(
-			checkpointMode, ioManager, inputGate.getPageSize(), taskManagerConfig, taskName);
+			checkpointMode, ioManager, pageSize, taskManagerConfig, taskName);
 		CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
 			checkpointMode, inputGate.getNumberOfInputChannels(), taskName, toNotifyOnCheckpoint);
 		return new CheckpointedInputGate(inputGate, bufferStorage, barrierHandler);
@@ -66,10 +69,12 @@ public class InputProcessorUtil {
 			Configuration taskManagerConfig,
 			String taskName) throws IOException {
 
+		int pageSize = ConfigurationParserUtils.getPageSize(taskManagerConfig);
+
 		BufferStorage mainBufferStorage1 = createBufferStorage(
-			checkpointMode, ioManager, inputGate1.getPageSize(), taskManagerConfig, taskName);
+			checkpointMode, ioManager, pageSize, taskManagerConfig, taskName);
 		BufferStorage mainBufferStorage2 = createBufferStorage(
-			checkpointMode, ioManager, inputGate2.getPageSize(), taskManagerConfig, taskName);
+			checkpointMode, ioManager, pageSize, taskManagerConfig, taskName);
 		checkState(mainBufferStorage1.getMaxBufferedBytes() == mainBufferStorage2.getMaxBufferedBytes());
 
 		BufferStorage linkedBufferStorage1 = new LinkedBufferStorage(


[flink] 04/04: [hotfix][network] Remove unnecessary getMemorySegmentSize method from NetworkBufferPool

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 666aeaf5d577e5422233e2d15295fb52d5eb38f6
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jun 6 16:01:06 2019 +0800

    [hotfix][network] Remove unnecessary getMemorySegmentSize method from NetworkBufferPool
    
    Since LocalBufferPool is not relying on NetworkBufferPool#getMemorySegmentSize any more, and the current only usage for this method
    is in tests which also seems unnecessary. So we could remove this method from NetworkBufferPool.
---
 .../org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java | 4 ----
 .../apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java | 1 -
 2 files changed, 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 0247ab7..87f0f06 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -230,10 +230,6 @@ public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvid
 		return isDestroyed;
 	}
 
-	public int getMemorySegmentSize() {
-		return memorySegmentSize;
-	}
-
 	public int getTotalNumberOfMemorySegments() {
 		return totalNumberOfMemorySegments;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 89f673c..cc6c3a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -61,7 +61,6 @@ public class NetworkBufferPoolTest extends TestLogger {
 			final int numBuffers = 10;
 
 			NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize, 1);
-			assertEquals(bufferSize, globalPool.getMemorySegmentSize());
 			assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
 			assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
 			assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());


[flink] 03/04: [hotfix][network] Remove getMemorySegmentSize method from BufferProvider interface

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 40048355b5a7ec442b321a4ed4c84a4ab791d74a
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jun 6 15:57:19 2019 +0800

    [hotfix][network] Remove getMemorySegmentSize method from BufferProvider interface
---
 .../runtime/io/network/buffer/BufferProvider.java  |  7 --
 .../runtime/io/network/buffer/LocalBufferPool.java |  5 --
 .../runtime/io/network/util/TestBufferFactory.java |  4 --
 .../io/network/util/TestPooledBufferProvider.java  |  5 --
 .../consumer/StreamTestSingleInputGate.java        | 82 ----------------------
 5 files changed, 103 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index 843a2f6..f652d6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -63,11 +63,4 @@ public interface BufferProvider {
 	 * Returns whether the buffer provider has been destroyed.
 	 */
 	boolean isDestroyed();
-
-	/**
-	 * Returns the size of the underlying memory segments. This is the maximum size a {@link Buffer}
-	 * instance can have.
-	 */
-	int getMemorySegmentSize();
-
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index dfad5b9..e65b6f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -165,11 +165,6 @@ class LocalBufferPool implements BufferPool {
 	}
 
 	@Override
-	public int getMemorySegmentSize() {
-		return networkBufferPool.getMemorySegmentSize();
-	}
-
-	@Override
 	public int getNumberOfRequiredMemorySegments() {
 		return numberOfRequiredMemorySegments;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index 406b81f..f6b2fa8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -64,10 +64,6 @@ public class TestBufferFactory {
 		return numberOfCreatedBuffers;
 	}
 
-	public synchronized int getBufferSize() {
-		return bufferSize;
-	}
-
 	// ------------------------------------------------------------------------
 	// Static test helpers
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index eba5912..bae0830 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -96,11 +96,6 @@ public class TestPooledBufferProvider implements BufferProvider {
 		return false;
 	}
 
-	@Override
-	public int getMemorySegmentSize() {
-		return bufferFactory.getBufferSize();
-	}
-
 	public int getNumberOfAvailableBuffers() {
 		return buffers.size();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 0ca68a9..291e15c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,16 +21,12 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferListener;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -79,7 +75,6 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 		inputQueues = new ConcurrentLinkedQueue[numInputChannels];
 
 		setupInputChannels();
-		inputGate.setBufferPool(new NoOpBufferPool(bufferSize));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -226,81 +221,4 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			return isEvent;
 		}
 	}
-
-	private static class NoOpBufferPool implements BufferPool {
-		private int bufferSize;
-
-		public NoOpBufferPool(int bufferSize) {
-			this.bufferSize = bufferSize;
-		}
-
-		@Override
-		public void lazyDestroy() {
-		}
-
-		@Override
-		public int getMemorySegmentSize() {
-			return bufferSize;
-		}
-
-		@Override
-		public Buffer requestBuffer() throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Buffer requestBufferBlocking() throws IOException, InterruptedException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public boolean addBufferListener(BufferListener listener) {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public boolean isDestroyed() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int getNumberOfRequiredMemorySegments() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int getMaxNumberOfMemorySegments() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int getNumBuffers() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void setNumBuffers(int numBuffers) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int getNumberOfAvailableMemorySegments() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int bestEffortGetNumOfUsedBuffers() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void recycle(MemorySegment memorySegment) {
-			throw new UnsupportedOperationException();
-		}
-	}
 }


[flink] 02/04: [FLINK-12738][network] Remove abstract getPageSize method from InputGate

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6563e385263beb556ce19855ca6dfdbb7f6c853
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jun 6 15:51:28 2019 +0800

    [FLINK-12738][network] Remove abstract getPageSize method from InputGate
    
    Currently InputGate#getPageSize is only used for testing purpose. In order to make abstract InputGate simple and clean,
    it is better to remove unnecessary abstract methods via refactoring relevant tests not to rely on it.
---
 .../runtime/io/network/partition/consumer/InputGate.java    |  2 --
 .../io/network/partition/consumer/SingleInputGate.java      | 10 ----------
 .../io/network/partition/consumer/UnionInputGate.java       | 13 -------------
 .../flink/runtime/taskmanager/InputGateWithMetrics.java     |  5 -----
 .../io/CheckpointBarrierAlignerAlignmentLimitTest.java      |  8 ++++----
 .../io/CheckpointBarrierAlignerMassiveRandomTest.java       |  7 +------
 .../runtime/io/CheckpointBarrierAlignerTestBase.java        |  2 +-
 .../streaming/runtime/io/CheckpointBarrierTrackerTest.java  |  2 +-
 .../apache/flink/streaming/runtime/io/MockInputGate.java    | 10 +---------
 9 files changed, 8 insertions(+), 51 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c297f70..0ce446b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -95,8 +95,6 @@ public abstract class InputGate implements AsyncDataInput<BufferOrEvent>, AutoCl
 
 	public abstract void sendTaskEvent(TaskEvent event) throws IOException;
 
-	public abstract int getPageSize();
-
 	/**
 	 * @return a future that is completed if there are more records available. If there are more
 	 * records available immediately, {@link #AVAILABLE} should be returned. Previously returned
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 7ba4298..b23572d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -243,16 +243,6 @@ public class SingleInputGate extends InputGate {
 		return bufferPool;
 	}
 
-	@Override
-	public int getPageSize() {
-		if (bufferPool != null) {
-			return bufferPool.getMemorySegmentSize();
-		}
-		else {
-			throw new IllegalStateException("Input gate has not been initialized with buffers.");
-		}
-	}
-
 	public int getNumberOfQueuedBuffers() {
 		// re-try 3 times, if fails, return 0 for "unknown"
 		for (int retry = 0; retry < 3; retry++) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index aa8f911..65a15ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -265,19 +265,6 @@ public class UnionInputGate extends InputGate {
 	}
 
 	@Override
-	public int getPageSize() {
-		int pageSize = -1;
-		for (InputGate gate : inputGates) {
-			if (pageSize == -1) {
-				pageSize = gate.getPageSize();
-			} else if (gate.getPageSize() != pageSize) {
-				throw new IllegalStateException("Found input gates with different page sizes.");
-			}
-		}
-		return pageSize;
-	}
-
-	@Override
 	public void setup() {
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 87d4b0f..669c02e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -86,11 +86,6 @@ public class InputGateWithMetrics extends InputGate {
 	}
 
 	@Override
-	public int getPageSize() {
-		return inputGate.getPageSize();
-	}
-
-	@Override
 	public void close() throws Exception {
 		inputGate.close();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
index 341e291..b4aad0f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
@@ -115,11 +115,11 @@ public class CheckpointBarrierAlignerAlignmentLimitTest {
 		};
 
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
-		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
 		CheckpointedInputGate buffer = new CheckpointedInputGate(
 			gate,
-			new BufferSpiller(ioManager, gate.getPageSize(), 1000),
+			new BufferSpiller(ioManager, PAGE_SIZE, 1000),
 			"Testing",
 			toNotify);
 
@@ -212,11 +212,11 @@ public class CheckpointBarrierAlignerAlignmentLimitTest {
 		};
 
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
-		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
 		CheckpointedInputGate buffer = new CheckpointedInputGate(
 			gate,
-			new BufferSpiller(ioManager, gate.getPageSize(), 500),
+			new BufferSpiller(ioManager, PAGE_SIZE, 500),
 			"Testing",
 			toNotify);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
index 2b452bd..552818d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
@@ -60,7 +60,7 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
 					new BufferPool[] { pool1, pool2 },
 					new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
 
-			CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(myIG, new BufferSpiller(ioMan, myIG.getPageSize()), "Testing: No task associated", null);
+			CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(myIG, new BufferSpiller(ioMan, PAGE_SIZE), "Testing: No task associated", null);
 
 			for (int i = 0; i < 2000000; i++) {
 				BufferOrEvent boe = checkpointedInputGate.pollNext().get();
@@ -182,11 +182,6 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
 		public void sendTaskEvent(TaskEvent event) {}
 
 		@Override
-		public int getPageSize() {
-			return PAGE_SIZE;
-		}
-
-		@Override
 		public void setup() {
 		}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
index 687c95d..9e82775 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
@@ -76,7 +76,7 @@ public abstract class CheckpointBarrierAlignerTestBase {
 		int numberOfChannels,
 		BufferOrEvent[] sequence,
 		@Nullable AbstractInvokable toNotify) throws IOException {
-		MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
+		MockInputGate gate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
 		return createBarrierBuffer(gate, toNotify);
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
index 2218680..3f2e592 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
@@ -373,7 +373,7 @@ public class CheckpointBarrierTrackerTest {
 			int numberOfChannels,
 			BufferOrEvent[] sequence,
 			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
-		MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
+		MockInputGate gate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
 		return new CheckpointedInputGate(
 			gate,
 			new CachedBufferStorage(PAGE_SIZE, -1, "Testing"),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index bc37f74..5f95e17 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -33,8 +33,6 @@ import java.util.Queue;
  */
 public class MockInputGate extends InputGate {
 
-	private final int pageSize;
-
 	private final int numberOfChannels;
 
 	private final Queue<BufferOrEvent> bufferOrEvents;
@@ -43,8 +41,7 @@ public class MockInputGate extends InputGate {
 
 	private int closedChannels;
 
-	public MockInputGate(int pageSize, int numberOfChannels, List<BufferOrEvent> bufferOrEvents) {
-		this.pageSize = pageSize;
+	MockInputGate(int numberOfChannels, List<BufferOrEvent> bufferOrEvents) {
 		this.numberOfChannels = numberOfChannels;
 		this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents);
 		this.closed = new boolean[numberOfChannels];
@@ -53,11 +50,6 @@ public class MockInputGate extends InputGate {
 	}
 
 	@Override
-	public int getPageSize() {
-		return pageSize;
-	}
-
-	@Override
 	public void setup() {
 	}