You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/16 12:02:38 UTC
flink git commit: [FLINK-8731] Replaced mockito with custom mock in
TestInputChannel
Repository: flink
Updated Branches:
refs/heads/master 5be27a23d -> 010f66547
[FLINK-8731] Replaced mockito with custom mock in TestInputChannel
This closes #6338
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/010f6654
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/010f6654
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/010f6654
Branch: refs/heads/master
Commit: 010f6654709d2212f3d81c7ad73c73f7ebd47ea8
Parents: 5be27a2
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Thu Jul 12 16:17:48 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Mon Jul 16 14:01:22 2018 +0200
----------------------------------------------------------------------
.../IteratorWrappingTestSingleInputGate.java | 21 ++--
.../partition/consumer/SingleInputGateTest.java | 14 +--
.../partition/consumer/TestInputChannel.java | 125 ++++++++++++-------
.../partition/consumer/TestSingleInputGate.java | 2 +-
.../partition/consumer/UnionInputGateTest.java | 16 +--
.../consumer/StreamTestSingleInputGate.java | 73 +++++------
6 files changed, 142 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 105e35f..a914733 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -25,19 +25,16 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
import java.io.IOException;
import java.util.Optional;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static org.mockito.Mockito.when;
public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
@@ -66,12 +63,12 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
// The input iterator can produce an infinite stream. That's why we have to serialize each
// record on demand and cannot do it upfront.
- final Answer<Optional<BufferAndAvailability>> answer = new Answer<Optional<BufferAndAvailability>>() {
+ final BufferAndAvailabilityProvider answer = new BufferAndAvailabilityProvider() {
private boolean hasData = inputIterator.next(reuse) != null;
@Override
- public Optional<BufferAndAvailability> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
if (hasData) {
serializer.clear();
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
@@ -83,22 +80,24 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0));
} else {
- when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
+ inputChannel.setReleased();
- return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0));
+ return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
+ false,
+ 0));
}
}
};
- when(inputChannel.getInputChannel().getNextBuffer()).thenAnswer(answer);
+ inputChannel.addBufferAndAvailability(answer);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel.getInputChannel());
+ inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
return this;
}
public IteratorWrappingTestSingleInputGate<T> notifyNonEmpty() {
- inputGate.notifyChannelNonEmpty(inputChannel.getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannel);
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index c244668..7120327 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -105,10 +105,10 @@ public class SingleInputGateTest {
};
inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
+ new IntermediateResultPartitionID(), inputChannels[0]);
inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
+ new IntermediateResultPartitionID(), inputChannels[1]);
// Test
inputChannels[0].readBuffer();
@@ -117,8 +117,8 @@ public class SingleInputGateTest {
inputChannels[1].readEndOfPartitionEvent();
inputChannels[0].readEndOfPartitionEvent();
- inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
- inputGate.notifyChannelNonEmpty(inputChannels[1].getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannels[0]);
+ inputGate.notifyChannelNonEmpty(inputChannels[1]);
verifyBufferOrEvent(inputGate, true, 0, true);
verifyBufferOrEvent(inputGate, true, 1, true);
@@ -141,16 +141,16 @@ public class SingleInputGateTest {
};
inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
+ new IntermediateResultPartitionID(), inputChannels[0]);
inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
+ new IntermediateResultPartitionID(), inputChannels[1]);
// Test
inputChannels[0].readBuffer();
inputChannels[0].readBuffer(false);
- inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannels[0]);
verifyBufferOrEvent(inputGate, true, 0, true);
verifyBufferOrEvent(inputGate, true, 0, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 3ae3a8a..80e07f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -18,18 +18,18 @@
package org.apache.flink.runtime.io.network.partition.consumer;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
import java.io.IOException;
import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,20 +39,16 @@ import static org.mockito.Mockito.when;
/**
* A mocked input channel.
*/
-public class TestInputChannel {
+public class TestInputChannel extends InputChannel {
- private final InputChannel mock = Mockito.mock(InputChannel.class);
+ private final Queue<BufferAndAvailabilityProvider> buffers = new ConcurrentLinkedQueue<>();
- private final SingleInputGate inputGate;
+ private BufferAndAvailabilityProvider lastProvider = null;
- // Abusing Mockito here... ;)
- protected OngoingStubbing<Optional<BufferAndAvailability>> stubbing;
+ private boolean isReleased = false;
- public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
- checkArgument(channelIndex >= 0);
- this.inputGate = checkNotNull(inputGate);
-
- when(mock.getChannelIndex()).thenReturn(channelIndex);
+ TestInputChannel(SingleInputGate inputGate, int channelIndex) {
+ super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter());
}
public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
@@ -60,48 +56,40 @@ public class TestInputChannel {
}
public TestInputChannel read(Buffer buffer, boolean moreAvailable) throws IOException, InterruptedException {
- if (stubbing == null) {
- stubbing = when(mock.getNextBuffer()).thenReturn(Optional.of(new BufferAndAvailability(buffer, moreAvailable, 0)));
- } else {
- stubbing = stubbing.thenReturn(Optional.of(new BufferAndAvailability(buffer, moreAvailable, 0)));
- }
+ addBufferAndAvailability(new BufferAndAvailability(buffer, moreAvailable, 0));
return this;
}
- public TestInputChannel readBuffer() throws IOException, InterruptedException {
+ TestInputChannel readBuffer() throws IOException, InterruptedException {
return readBuffer(true);
}
- public TestInputChannel readBuffer(boolean moreAvailable) throws IOException, InterruptedException {
+ TestInputChannel readBuffer(boolean moreAvailable) throws IOException, InterruptedException {
final Buffer buffer = mock(Buffer.class);
when(buffer.isBuffer()).thenReturn(true);
return read(buffer, moreAvailable);
}
- public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
- final Answer<Optional<BufferAndAvailability>> answer = new Answer<Optional<BufferAndAvailability>>() {
- @Override
- public Optional<BufferAndAvailability> answer(InvocationOnMock invocationOnMock) throws Throwable {
- // Return true after finishing
- when(mock.isReleased()).thenReturn(true);
-
- return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0));
+ TestInputChannel readEndOfPartitionEvent() throws InterruptedException {
+ addBufferAndAvailability(
+ () -> {
+ setReleased();
+ return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
+ false,
+ 0));
}
- };
-
- if (stubbing == null) {
- stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
- } else {
- stubbing = stubbing.thenAnswer(answer);
- }
-
+ );
return this;
}
- public InputChannel getInputChannel() {
- return mock;
+ void addBufferAndAvailability(BufferAndAvailability bufferAndAvailability) {
+ buffers.add(() -> Optional.of(bufferAndAvailability));
+ }
+
+ void addBufferAndAvailability(BufferAndAvailabilityProvider bufferAndAvailability) {
+ buffers.add(bufferAndAvailability);
}
// ------------------------------------------------------------------------
@@ -111,7 +99,7 @@ public class TestInputChannel {
*
* @return The created test input channels.
*/
- public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
+ static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
checkNotNull(inputGate);
checkArgument(numberOfInputChannels > 0);
@@ -120,9 +108,62 @@ public class TestInputChannel {
for (int i = 0; i < numberOfInputChannels; i++) {
mocks[i] = new TestInputChannel(inputGate, i);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
+ inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i]);
}
return mocks;
}
+
+ @Override
+ void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {
+ BufferAndAvailabilityProvider provider = buffers.poll();
+
+ if (provider != null) {
+ lastProvider = provider;
+ return provider.getBufferAvailability();
+ } else if (lastProvider != null) {
+ return lastProvider.getBufferAvailability();
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ void sendTaskEvent(TaskEvent event) throws IOException {
+
+ }
+
+ @Override
+ boolean isReleased() {
+ return isReleased;
+ }
+
+ void setReleased() {
+ this.isReleased = true;
+ }
+
+ @Override
+ void notifySubpartitionConsumed() throws IOException {
+
+ }
+
+ @Override
+ void releaseAllResources() throws IOException {
+
+ }
+
+ @Override
+ protected void notifyChannelNonEmpty() {
+
+ }
+
+ interface BufferAndAvailabilityProvider {
+ Optional<BufferAndAvailability> getBufferAvailability() throws IOException, InterruptedException;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 33dc1ca..b0bafd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -98,7 +98,7 @@ public class TestSingleInputGate {
if (initialize) {
for (int i = 0; i < numberOfInputChannels; i++) {
inputChannels[i] = new TestInputChannel(inputGate, i);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
+ inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i]);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 081d97d..2e01225 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -86,15 +86,15 @@ public class UnionInputGateTest {
inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3
- ig1.notifyChannelNonEmpty(inputChannels[0][0].getInputChannel());
- ig1.notifyChannelNonEmpty(inputChannels[0][1].getInputChannel());
- ig1.notifyChannelNonEmpty(inputChannels[0][2].getInputChannel());
+ ig1.notifyChannelNonEmpty(inputChannels[0][0]);
+ ig1.notifyChannelNonEmpty(inputChannels[0][1]);
+ ig1.notifyChannelNonEmpty(inputChannels[0][2]);
- ig2.notifyChannelNonEmpty(inputChannels[1][0].getInputChannel());
- ig2.notifyChannelNonEmpty(inputChannels[1][1].getInputChannel());
- ig2.notifyChannelNonEmpty(inputChannels[1][2].getInputChannel());
- ig2.notifyChannelNonEmpty(inputChannels[1][3].getInputChannel());
- ig2.notifyChannelNonEmpty(inputChannels[1][4].getInputChannel());
+ ig2.notifyChannelNonEmpty(inputChannels[1][0]);
+ ig2.notifyChannelNonEmpty(inputChannels[1][1]);
+ ig2.notifyChannelNonEmpty(inputChannels[1][2]);
+ ig2.notifyChannelNonEmpty(inputChannels[1][3]);
+ ig2.notifyChannelNonEmpty(inputChannels[1][4]);
verifyBufferOrEvent(union, true, 0, true); // gate 1, channel 0
verifyBufferOrEvent(union, true, 3, true); // gate 2, channel 0
http://git-wip-us.apache.org/repos/asf/flink/blob/010f6654/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 6ab8074..ea38382 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -28,14 +28,12 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -43,7 +41,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.when;
/**
* Test {@link InputGate} that allows setting multiple channels. Use
@@ -94,44 +91,40 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
- final Answer<Optional<BufferAndAvailability>> answer = new Answer<Optional<BufferAndAvailability>>() {
- @Override
- public Optional<BufferAndAvailability> answer(InvocationOnMock invocationOnMock) throws Throwable {
- ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
- InputValue<Object> input;
- boolean moreAvailable;
- synchronized (inputQueue) {
- input = inputQueue.poll();
- moreAvailable = !inputQueue.isEmpty();
- }
- if (input != null && input.isStreamEnd()) {
- when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
- true);
- return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
- } else if (input != null && input.isStreamRecord()) {
- Object inputElement = input.getStreamRecord();
-
- BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
- recordSerializer.continueWritingWithNextBufferBuilder(bufferBuilder);
- delegate.setInstance(inputElement);
- recordSerializer.addRecord(delegate);
- bufferBuilder.finish();
-
- // Call getCurrentBuffer to ensure size is set
- return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
- } else if (input != null && input.isEvent()) {
- AbstractEvent event = input.getEvent();
- return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
- } else {
- return Optional.empty();
- }
+ final BufferAndAvailabilityProvider answer = () -> {
+ ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
+ InputValue<Object> input;
+ boolean moreAvailable;
+ synchronized (inputQueue) {
+ input = inputQueue.poll();
+ moreAvailable = !inputQueue.isEmpty();
+ }
+ if (input != null && input.isStreamEnd()) {
+ inputChannels[channelIndex].setReleased();
+ return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
+ } else if (input != null && input.isStreamRecord()) {
+ Object inputElement = input.getStreamRecord();
+
+ BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
+ recordSerializer.continueWritingWithNextBufferBuilder(bufferBuilder);
+ delegate.setInstance(inputElement);
+ recordSerializer.addRecord(delegate);
+ bufferBuilder.finish();
+
+ // Call getCurrentBuffer to ensure size is set
+ return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
+ } else if (input != null && input.isEvent()) {
+ AbstractEvent event = input.getEvent();
+ return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
+ } else {
+ return Optional.empty();
}
};
- when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer);
+ inputChannels[channelIndex].addBufferAndAvailability(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(),
- inputChannels[channelIndex].getInputChannel());
+ inputChannels[channelIndex]);
}
}
@@ -140,7 +133,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputQueues[channel].add(InputValue.element(element));
inputQueues[channel].notifyAll();
}
- inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannels[channel]);
}
public void sendEvent(AbstractEvent event, int channel) {
@@ -148,7 +141,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputQueues[channel].add(InputValue.event(event));
inputQueues[channel].notifyAll();
}
- inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannels[channel]);
}
public void endInput() {
@@ -157,7 +150,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputQueues[i].add(InputValue.streamEnd());
inputQueues[i].notifyAll();
}
- inputGate.notifyChannelNonEmpty(inputChannels[i].getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannels[i]);
}
}