You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/16 12:02:38 UTC

flink git commit: [FLINK-8731] Replaced mockito with custom mock in TestInputChannel

Repository: flink
Updated Branches:
  refs/heads/master 5be27a23d -> 010f66547


[FLINK-8731] Replaced mockito with custom mock in TestInputChannel

This closes #6338


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/010f6654
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/010f6654
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/010f6654

Branch: refs/heads/master
Commit: 010f6654709d2212f3d81c7ad73c73f7ebd47ea8
Parents: 5be27a2
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Thu Jul 12 16:17:48 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Mon Jul 16 14:01:22 2018 +0200

----------------------------------------------------------------------
 .../IteratorWrappingTestSingleInputGate.java    |  21 ++--
 .../partition/consumer/SingleInputGateTest.java |  14 +--
 .../partition/consumer/TestInputChannel.java    | 125 ++++++++++++-------
 .../partition/consumer/TestSingleInputGate.java |   2 +-
 .../partition/consumer/UnionInputGateTest.java  |  16 +--
 .../consumer/StreamTestSingleInputGate.java     |  73 +++++------
 6 files changed, 142 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 105e35f..a914733 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -25,19 +25,16 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import java.io.IOException;
 import java.util.Optional;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static org.mockito.Mockito.when;
 
 public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
 
@@ -66,12 +63,12 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 
 		// The input iterator can produce an infinite stream. That's why we have to serialize each
 		// record on demand and cannot do it upfront.
-		final Answer<Optional<BufferAndAvailability>> answer = new Answer<Optional<BufferAndAvailability>>() {
+		final BufferAndAvailabilityProvider answer = new BufferAndAvailabilityProvider() {
 
 			private boolean hasData = inputIterator.next(reuse) != null;
 
 			@Override
-			public Optional<BufferAndAvailability> answer(InvocationOnMock invocationOnMock) throws Throwable {
+			public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
 				if (hasData) {
 					serializer.clear();
 					BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
@@ -83,22 +80,24 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 					// Call getCurrentBuffer to ensure size is set
 					return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0));
 				} else {
-					when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
+					inputChannel.setReleased();
 
-					return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0));
+					return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
+						false,
+						0));
 				}
 			}
 		};
 
-		when(inputChannel.getInputChannel().getNextBuffer()).thenAnswer(answer);
+		inputChannel.addBufferAndAvailability(answer);
 
-		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel.getInputChannel());
+		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
 
 		return this;
 	}
 
 	public IteratorWrappingTestSingleInputGate<T> notifyNonEmpty() {
-		inputGate.notifyChannelNonEmpty(inputChannel.getInputChannel());
+		inputGate.notifyChannelNonEmpty(inputChannel);
 
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index c244668..7120327 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -105,10 +105,10 @@ public class SingleInputGateTest {
 		};
 
 		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
+			new IntermediateResultPartitionID(), inputChannels[0]);
 
 		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
+			new IntermediateResultPartitionID(), inputChannels[1]);
 
 		// Test
 		inputChannels[0].readBuffer();
@@ -117,8 +117,8 @@ public class SingleInputGateTest {
 		inputChannels[1].readEndOfPartitionEvent();
 		inputChannels[0].readEndOfPartitionEvent();
 
-		inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
-		inputGate.notifyChannelNonEmpty(inputChannels[1].getInputChannel());
+		inputGate.notifyChannelNonEmpty(inputChannels[0]);
+		inputGate.notifyChannelNonEmpty(inputChannels[1]);
 
 		verifyBufferOrEvent(inputGate, true, 0, true);
 		verifyBufferOrEvent(inputGate, true, 1, true);
@@ -141,16 +141,16 @@ public class SingleInputGateTest {
 		};
 
 		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
+			new IntermediateResultPartitionID(), inputChannels[0]);
 
 		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
+			new IntermediateResultPartitionID(), inputChannels[1]);
 
 		// Test
 		inputChannels[0].readBuffer();
 		inputChannels[0].readBuffer(false);
 
-		inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
+		inputGate.notifyChannelNonEmpty(inputChannels[0]);
 
 		verifyBufferOrEvent(inputGate, true, 0, true);
 		verifyBufferOrEvent(inputGate, true, 0, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 3ae3a8a..80e07f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
 
 import java.io.IOException;
 import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,20 +39,16 @@ import static org.mockito.Mockito.when;
 /**
  * A mocked input channel.
  */
-public class TestInputChannel {
+public class TestInputChannel extends InputChannel {
 
-	private final InputChannel mock = Mockito.mock(InputChannel.class);
+	private final Queue<BufferAndAvailabilityProvider> buffers = new ConcurrentLinkedQueue<>();
 
-	private final SingleInputGate inputGate;
+	private BufferAndAvailabilityProvider lastProvider = null;
 
-	// Abusing Mockito here... ;)
-	protected OngoingStubbing<Optional<BufferAndAvailability>> stubbing;
+	private boolean isReleased = false;
 
-	public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
-		checkArgument(channelIndex >= 0);
-		this.inputGate = checkNotNull(inputGate);
-
-		when(mock.getChannelIndex()).thenReturn(channelIndex);
+	TestInputChannel(SingleInputGate inputGate, int channelIndex) {
+		super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter());
 	}
 
 	public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
@@ -60,48 +56,40 @@ public class TestInputChannel {
 	}
 
 	public TestInputChannel read(Buffer buffer, boolean moreAvailable) throws IOException, InterruptedException {
-		if (stubbing == null) {
-			stubbing = when(mock.getNextBuffer()).thenReturn(Optional.of(new BufferAndAvailability(buffer, moreAvailable, 0)));
-		} else {
-			stubbing = stubbing.thenReturn(Optional.of(new BufferAndAvailability(buffer, moreAvailable, 0)));
-		}
+		addBufferAndAvailability(new BufferAndAvailability(buffer, moreAvailable, 0));
 
 		return this;
 	}
 
-	public TestInputChannel readBuffer() throws IOException, InterruptedException {
+	TestInputChannel readBuffer() throws IOException, InterruptedException {
 		return readBuffer(true);
 	}
 
-	public TestInputChannel readBuffer(boolean moreAvailable) throws IOException, InterruptedException {
+	TestInputChannel readBuffer(boolean moreAvailable) throws IOException, InterruptedException {
 		final Buffer buffer = mock(Buffer.class);
 		when(buffer.isBuffer()).thenReturn(true);
 
 		return read(buffer, moreAvailable);
 	}
 
-	public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
-		final Answer<Optional<BufferAndAvailability>> answer = new Answer<Optional<BufferAndAvailability>>() {
-			@Override
-			public Optional<BufferAndAvailability> answer(InvocationOnMock invocationOnMock) throws Throwable {
-				// Return true after finishing
-				when(mock.isReleased()).thenReturn(true);
-
-				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0));
+	TestInputChannel readEndOfPartitionEvent() throws InterruptedException {
+		addBufferAndAvailability(
+			() -> {
+				setReleased();
+				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
+					false,
+					0));
 			}
-		};
-
-		if (stubbing == null) {
-			stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
-		} else {
-			stubbing = stubbing.thenAnswer(answer);
-		}
-
+		);
 		return this;
 	}
 
-	public InputChannel getInputChannel() {
-		return mock;
+	void addBufferAndAvailability(BufferAndAvailability bufferAndAvailability) {
+		buffers.add(() -> Optional.of(bufferAndAvailability));
+	}
+
+	void addBufferAndAvailability(BufferAndAvailabilityProvider bufferAndAvailability) {
+		buffers.add(bufferAndAvailability);
 	}
 
 	// ------------------------------------------------------------------------
@@ -111,7 +99,7 @@ public class TestInputChannel {
 	 *
 	 * @return The created test input channels.
 	 */
-	public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
+	static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
 		checkNotNull(inputGate);
 		checkArgument(numberOfInputChannels > 0);
 
@@ -120,9 +108,62 @@ public class TestInputChannel {
 		for (int i = 0; i < numberOfInputChannels; i++) {
 			mocks[i] = new TestInputChannel(inputGate, i);
 
-			inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
+			inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i]);
 		}
 
 		return mocks;
 	}
+
+	@Override
+	void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+
+	}
+
+	@Override
+	Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {
+		BufferAndAvailabilityProvider provider = buffers.poll();
+
+		if (provider != null) {
+			lastProvider = provider;
+			return provider.getBufferAvailability();
+		} else if (lastProvider != null) {
+			return lastProvider.getBufferAvailability();
+		} else {
+			return Optional.empty();
+		}
+	}
+
+	@Override
+	void sendTaskEvent(TaskEvent event) throws IOException {
+
+	}
+
+	@Override
+	boolean isReleased() {
+		return isReleased;
+	}
+
+	void setReleased() {
+		this.isReleased = true;
+	}
+
+	@Override
+	void notifySubpartitionConsumed() throws IOException {
+
+	}
+
+	@Override
+	void releaseAllResources() throws IOException {
+
+	}
+
+	@Override
+	protected void notifyChannelNonEmpty() {
+
+	}
+
+	interface BufferAndAvailabilityProvider {
+		Optional<BufferAndAvailability> getBufferAvailability() throws IOException, InterruptedException;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 33dc1ca..b0bafd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -98,7 +98,7 @@ public class TestSingleInputGate {
 		if (initialize) {
 			for (int i = 0; i < numberOfInputChannels; i++) {
 				inputChannels[i] = new TestInputChannel(inputGate, i);
-				inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
+				inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i]);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 081d97d..2e01225 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -86,15 +86,15 @@ public class UnionInputGateTest {
 		inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
 		inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3
 
-		ig1.notifyChannelNonEmpty(inputChannels[0][0].getInputChannel());
-		ig1.notifyChannelNonEmpty(inputChannels[0][1].getInputChannel());
-		ig1.notifyChannelNonEmpty(inputChannels[0][2].getInputChannel());
+		ig1.notifyChannelNonEmpty(inputChannels[0][0]);
+		ig1.notifyChannelNonEmpty(inputChannels[0][1]);
+		ig1.notifyChannelNonEmpty(inputChannels[0][2]);
 
-		ig2.notifyChannelNonEmpty(inputChannels[1][0].getInputChannel());
-		ig2.notifyChannelNonEmpty(inputChannels[1][1].getInputChannel());
-		ig2.notifyChannelNonEmpty(inputChannels[1][2].getInputChannel());
-		ig2.notifyChannelNonEmpty(inputChannels[1][3].getInputChannel());
-		ig2.notifyChannelNonEmpty(inputChannels[1][4].getInputChannel());
+		ig2.notifyChannelNonEmpty(inputChannels[1][0]);
+		ig2.notifyChannelNonEmpty(inputChannels[1][1]);
+		ig2.notifyChannelNonEmpty(inputChannels[1][2]);
+		ig2.notifyChannelNonEmpty(inputChannels[1][3]);
+		ig2.notifyChannelNonEmpty(inputChannels[1][4]);
 
 		verifyBufferOrEvent(union, true, 0, true); // gate 1, channel 0
 		verifyBufferOrEvent(union, true, 3, true); // gate 2, channel 0

http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 6ab8074..ea38382 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -28,14 +28,12 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -43,7 +41,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.when;
 
 /**
  * Test {@link InputGate} that allows setting multiple channels. Use
@@ -94,44 +91,40 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
 			inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
 
-			final Answer<Optional<BufferAndAvailability>> answer = new Answer<Optional<BufferAndAvailability>>() {
-				@Override
-				public Optional<BufferAndAvailability> answer(InvocationOnMock invocationOnMock) throws Throwable {
-					ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
-					InputValue<Object> input;
-					boolean moreAvailable;
-					synchronized (inputQueue) {
-						input = inputQueue.poll();
-						moreAvailable = !inputQueue.isEmpty();
-					}
-					if (input != null && input.isStreamEnd()) {
-						when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
-							true);
-						return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
-					} else if (input != null && input.isStreamRecord()) {
-						Object inputElement = input.getStreamRecord();
-
-						BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
-						recordSerializer.continueWritingWithNextBufferBuilder(bufferBuilder);
-						delegate.setInstance(inputElement);
-						recordSerializer.addRecord(delegate);
-						bufferBuilder.finish();
-
-						// Call getCurrentBuffer to ensure size is set
-						return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
-					} else if (input != null && input.isEvent()) {
-						AbstractEvent event = input.getEvent();
-						return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
-					} else {
-						return Optional.empty();
-					}
+			final BufferAndAvailabilityProvider answer = () -> {
+				ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
+				InputValue<Object> input;
+				boolean moreAvailable;
+				synchronized (inputQueue) {
+					input = inputQueue.poll();
+					moreAvailable = !inputQueue.isEmpty();
+				}
+				if (input != null && input.isStreamEnd()) {
+					inputChannels[channelIndex].setReleased();
+					return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
+				} else if (input != null && input.isStreamRecord()) {
+					Object inputElement = input.getStreamRecord();
+
+					BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
+					recordSerializer.continueWritingWithNextBufferBuilder(bufferBuilder);
+					delegate.setInstance(inputElement);
+					recordSerializer.addRecord(delegate);
+					bufferBuilder.finish();
+
+					// Call getCurrentBuffer to ensure size is set
+					return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
+				} else if (input != null && input.isEvent()) {
+					AbstractEvent event = input.getEvent();
+					return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
+				} else {
+					return Optional.empty();
 				}
 			};
 
-			when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer);
+			inputChannels[channelIndex].addBufferAndAvailability(answer);
 
 			inputGate.setInputChannel(new IntermediateResultPartitionID(),
-				inputChannels[channelIndex].getInputChannel());
+				inputChannels[channelIndex]);
 		}
 	}
 
@@ -140,7 +133,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			inputQueues[channel].add(InputValue.element(element));
 			inputQueues[channel].notifyAll();
 		}
-		inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
+		inputGate.notifyChannelNonEmpty(inputChannels[channel]);
 	}
 
 	public void sendEvent(AbstractEvent event, int channel) {
@@ -148,7 +141,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			inputQueues[channel].add(InputValue.event(event));
 			inputQueues[channel].notifyAll();
 		}
-		inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
+		inputGate.notifyChannelNonEmpty(inputChannels[channel]);
 	}
 
 	public void endInput() {
@@ -157,7 +150,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 				inputQueues[i].add(InputValue.streamEnd());
 				inputQueues[i].notifyAll();
 			}
-			inputGate.notifyChannelNonEmpty(inputChannels[i].getInputChannel());
+			inputGate.notifyChannelNonEmpty(inputChannels[i]);
 		}
 	}