You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/04 08:25:49 UTC

[flink] 02/04: [FLINK-12738][network] Remove abstract getPageSize method from InputGate

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6563e385263beb556ce19855ca6dfdbb7f6c853
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jun 6 15:51:28 2019 +0800

    [FLINK-12738][network] Remove abstract getPageSize method from InputGate
    
    Currently InputGate#getPageSize is only used for testing purpose. In order to make abstract InputGate simple and clean,
    it is better to remove unnecessary abstract methods via refactoring relevant tests not to rely on it.
---
 .../runtime/io/network/partition/consumer/InputGate.java    |  2 --
 .../io/network/partition/consumer/SingleInputGate.java      | 10 ----------
 .../io/network/partition/consumer/UnionInputGate.java       | 13 -------------
 .../flink/runtime/taskmanager/InputGateWithMetrics.java     |  5 -----
 .../io/CheckpointBarrierAlignerAlignmentLimitTest.java      |  8 ++++----
 .../io/CheckpointBarrierAlignerMassiveRandomTest.java       |  7 +------
 .../runtime/io/CheckpointBarrierAlignerTestBase.java        |  2 +-
 .../streaming/runtime/io/CheckpointBarrierTrackerTest.java  |  2 +-
 .../apache/flink/streaming/runtime/io/MockInputGate.java    | 10 +---------
 9 files changed, 8 insertions(+), 51 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c297f70..0ce446b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -95,8 +95,6 @@ public abstract class InputGate implements AsyncDataInput<BufferOrEvent>, AutoCl
 
 	public abstract void sendTaskEvent(TaskEvent event) throws IOException;
 
-	public abstract int getPageSize();
-
 	/**
 	 * @return a future that is completed if there are more records available. If there are more
 	 * records available immediately, {@link #AVAILABLE} should be returned. Previously returned
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 7ba4298..b23572d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -243,16 +243,6 @@ public class SingleInputGate extends InputGate {
 		return bufferPool;
 	}
 
-	@Override
-	public int getPageSize() {
-		if (bufferPool != null) {
-			return bufferPool.getMemorySegmentSize();
-		}
-		else {
-			throw new IllegalStateException("Input gate has not been initialized with buffers.");
-		}
-	}
-
 	public int getNumberOfQueuedBuffers() {
 		// re-try 3 times, if fails, return 0 for "unknown"
 		for (int retry = 0; retry < 3; retry++) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index aa8f911..65a15ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -265,19 +265,6 @@ public class UnionInputGate extends InputGate {
 	}
 
 	@Override
-	public int getPageSize() {
-		int pageSize = -1;
-		for (InputGate gate : inputGates) {
-			if (pageSize == -1) {
-				pageSize = gate.getPageSize();
-			} else if (gate.getPageSize() != pageSize) {
-				throw new IllegalStateException("Found input gates with different page sizes.");
-			}
-		}
-		return pageSize;
-	}
-
-	@Override
 	public void setup() {
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 87d4b0f..669c02e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -86,11 +86,6 @@ public class InputGateWithMetrics extends InputGate {
 	}
 
 	@Override
-	public int getPageSize() {
-		return inputGate.getPageSize();
-	}
-
-	@Override
 	public void close() throws Exception {
 		inputGate.close();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
index 341e291..b4aad0f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
@@ -115,11 +115,11 @@ public class CheckpointBarrierAlignerAlignmentLimitTest {
 		};
 
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
-		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
 		CheckpointedInputGate buffer = new CheckpointedInputGate(
 			gate,
-			new BufferSpiller(ioManager, gate.getPageSize(), 1000),
+			new BufferSpiller(ioManager, PAGE_SIZE, 1000),
 			"Testing",
 			toNotify);
 
@@ -212,11 +212,11 @@ public class CheckpointBarrierAlignerAlignmentLimitTest {
 		};
 
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
-		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
 		CheckpointedInputGate buffer = new CheckpointedInputGate(
 			gate,
-			new BufferSpiller(ioManager, gate.getPageSize(), 500),
+			new BufferSpiller(ioManager, PAGE_SIZE, 500),
 			"Testing",
 			toNotify);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
index 2b452bd..552818d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
@@ -60,7 +60,7 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
 					new BufferPool[] { pool1, pool2 },
 					new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
 
-			CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(myIG, new BufferSpiller(ioMan, myIG.getPageSize()), "Testing: No task associated", null);
+			CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(myIG, new BufferSpiller(ioMan, PAGE_SIZE), "Testing: No task associated", null);
 
 			for (int i = 0; i < 2000000; i++) {
 				BufferOrEvent boe = checkpointedInputGate.pollNext().get();
@@ -182,11 +182,6 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
 		public void sendTaskEvent(TaskEvent event) {}
 
 		@Override
-		public int getPageSize() {
-			return PAGE_SIZE;
-		}
-
-		@Override
 		public void setup() {
 		}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
index 687c95d..9e82775 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
@@ -76,7 +76,7 @@ public abstract class CheckpointBarrierAlignerTestBase {
 		int numberOfChannels,
 		BufferOrEvent[] sequence,
 		@Nullable AbstractInvokable toNotify) throws IOException {
-		MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
+		MockInputGate gate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
 		return createBarrierBuffer(gate, toNotify);
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
index 2218680..3f2e592 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
@@ -373,7 +373,7 @@ public class CheckpointBarrierTrackerTest {
 			int numberOfChannels,
 			BufferOrEvent[] sequence,
 			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
-		MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
+		MockInputGate gate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
 		return new CheckpointedInputGate(
 			gate,
 			new CachedBufferStorage(PAGE_SIZE, -1, "Testing"),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index bc37f74..5f95e17 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -33,8 +33,6 @@ import java.util.Queue;
  */
 public class MockInputGate extends InputGate {
 
-	private final int pageSize;
-
 	private final int numberOfChannels;
 
 	private final Queue<BufferOrEvent> bufferOrEvents;
@@ -43,8 +41,7 @@ public class MockInputGate extends InputGate {
 
 	private int closedChannels;
 
-	public MockInputGate(int pageSize, int numberOfChannels, List<BufferOrEvent> bufferOrEvents) {
-		this.pageSize = pageSize;
+	MockInputGate(int numberOfChannels, List<BufferOrEvent> bufferOrEvents) {
 		this.numberOfChannels = numberOfChannels;
 		this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents);
 		this.closed = new boolean[numberOfChannels];
@@ -53,11 +50,6 @@ public class MockInputGate extends InputGate {
 	}
 
 	@Override
-	public int getPageSize() {
-		return pageSize;
-	}
-
-	@Override
 	public void setup() {
 	}