You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/18 09:25:35 UTC

[flink] 02/05: [FLINK-18094][network] Add InputGate#getChannelInfos for easier testing.

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b399a145cde22dba7a8255b05309daee14e6c1da
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Mon Jun 15 21:09:13 2020 +0200

    [FLINK-18094][network] Add InputGate#getChannelInfos for easier testing.
    
    In the following commits, this method will be used to fetch information about all channels without explicitly needing to access the channels. Thus, for tests mocks just need to return meaningful InputChannelInfos instead of actually creating the respective channels.
---
 .../runtime/io/network/partition/consumer/InputGate.java    | 13 +++++++++++++
 .../flink/streaming/runtime/io/CheckpointedInputGate.java   |  6 ++++++
 .../flink/streaming/runtime/io/MockIndexedInputGate.java    | 11 +++++++++++
 .../apache/flink/streaming/runtime/io/MockInputGate.java    | 10 ++++++++++
 4 files changed, 40 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 44420b8..0489fde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -19,14 +19,18 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.PullingAsyncDataInput;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -118,6 +122,15 @@ public abstract class InputGate implements PullingAsyncDataInput<BufferOrEvent>,
 	public abstract InputChannel getChannel(int channelIndex);
 
 	/**
+	 * Returns the channel infos of this gate.
+	 */
+	public List<InputChannelInfo> getChannelInfos() {
+		return IntStream.range(0, getNumberOfInputChannels())
+			.mapToObj(index -> getChannel(index).getChannelInfo())
+			.collect(Collectors.toList());
+	}
+
+	/**
 	 * Simple pojo for INPUT, DATA and moreAvailable.
 	 */
 	protected static class InputWithData<INPUT, DATA> {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 65fa098..9428515 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.PullingAsyncDataInput;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -231,6 +233,10 @@ public class CheckpointedInputGate implements PullingAsyncDataInput<BufferOrEven
 		return inputGate.getChannel(channelIndex);
 	}
 
+	public List<InputChannelInfo> getChannelInfos() {
+		return inputGate.getChannelInfos();
+	}
+
 	@VisibleForTesting
 	CheckpointBarrierHandler getCheckpointBarrierHandler() {
 		return barrierHandler;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index a04ec55..3ec99a8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -19,15 +19,19 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Mock {@link IndexedInputGate}.
@@ -73,6 +77,13 @@ public class MockIndexedInputGate extends IndexedInputGate {
 	}
 
 	@Override
+	public List<InputChannelInfo> getChannelInfos() {
+		return IntStream.range(0, numberOfInputChannels)
+				.mapToObj(channelIndex -> new InputChannelInfo(gateIndex, channelIndex))
+				.collect(Collectors.toList());
+	}
+
+	@Override
 	public boolean isFinished() {
 		return false;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index d0e958a..a536cbd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
@@ -33,6 +34,8 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Mock {@link InputGate}.
@@ -89,6 +92,13 @@ public class MockInputGate extends InputGate {
 	}
 
 	@Override
+	public List<InputChannelInfo> getChannelInfos() {
+		return IntStream.range(0, numberOfChannels)
+			.mapToObj(channelIndex -> new InputChannelInfo(0, channelIndex))
+			.collect(Collectors.toList());
+	}
+
+	@Override
 	public boolean isFinished() {
 		return finishAfterLastBuffer && bufferOrEvents.isEmpty();
 	}