You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/26 11:51:55 UTC
[2/3] flink git commit: [runtime] Fix possible resource leak in
RemoteInputChannel
[runtime] Fix possible resource leak in RemoteInputChannel
It was possible that a concurrent release and enqueue operation from the
network I/O thread leave a buffer queued at a released channel, resulting in a
resource leak.
This commit fixes this and adds a test, which successfully provoked the race
condition before the fix.
The visibility of input channel operations is reduced to package private.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d72a3f7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d72a3f7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d72a3f7f
Branch: refs/heads/master
Commit: d72a3f7f99a7e242b191e993ced123d98a083d14
Parents: 95bdaad
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Mar 24 10:25:52 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Mar 26 11:51:26 2015 +0100
----------------------------------------------------------------------
.../network/netty/PartitionRequestClient.java | 2 +-
.../partition/consumer/InputChannel.java | 14 +-
.../partition/consumer/LocalInputChannel.java | 14 +-
.../partition/consumer/RemoteInputChannel.java | 123 +++++++++------
.../io/network/api/reader/BufferReaderTest.java | 2 +-
.../IteratorWrappingTestSingleInputGate.java | 103 -------------
.../IteratorWrappingTestSingleInputGate.java | 101 ++++++++++++
.../consumer/RemoteInputChannelTest.java | 152 +++++++++++++++++++
.../partition/consumer/SingleInputGateTest.java | 1 -
.../partition/consumer/TestInputChannel.java | 134 ++++++++++++++++
.../partition/consumer/TestSingleInputGate.java | 139 +++++++++++++++++
.../partition/consumer/UnionInputGateTest.java | 1 -
.../io/network/util/TestBufferFactory.java | 6 +
.../io/network/util/TestInputChannel.java | 135 ----------------
.../io/network/util/TestSingleInputGate.java | 140 -----------------
.../runtime/operators/DataSinkTaskTest.java | 2 +-
.../operators/testutils/MockEnvironment.java | 2 +-
.../operators/testutils/TaskTestBase.java | 2 +-
18 files changed, 624 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 32e3951..b5f89e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -79,7 +79,7 @@ public class PartitionRequestClient {
* The request goes to the remote producer, for which this partition
* request client instance has been created.
*/
- public void requestIntermediateResultPartition(final ResultPartitionID partitionId, int requestedQueueIndex, final RemoteInputChannel inputChannel) throws IOException {
+ public void requestSubpartition(final ResultPartitionID partitionId, int requestedQueueIndex, final RemoteInputChannel inputChannel) throws IOException {
partitionRequestHandler.addInputChannel(inputChannel);
tcpChannel.writeAndFlush(new PartitionRequest(partitionId, requestedQueueIndex, inputChannel.getInputChannelId()))
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 855509c..0805066 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -53,7 +53,7 @@ public abstract class InputChannel {
// Properties
// ------------------------------------------------------------------------
- public int getChannelIndex() {
+ int getChannelIndex() {
return channelIndex;
}
@@ -75,12 +75,12 @@ public abstract class InputChannel {
* The queue index to request depends on which sub task the channel belongs
* to and is specified by the consumer of this channel.
*/
- public abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
+ abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
/**
* Returns the next buffer from the consumed subpartition.
*/
- public abstract Buffer getNextBuffer() throws IOException, InterruptedException;
+ abstract Buffer getNextBuffer() throws IOException, InterruptedException;
// ------------------------------------------------------------------------
// Task events
@@ -94,19 +94,19 @@ public abstract class InputChannel {
* the producer will wait for all backwards events. Otherwise, this will lead to an Exception
* at runtime.
*/
- public abstract void sendTaskEvent(TaskEvent event) throws IOException;
+ abstract void sendTaskEvent(TaskEvent event) throws IOException;
// ------------------------------------------------------------------------
// Life cycle
// ------------------------------------------------------------------------
- public abstract boolean isReleased();
+ abstract boolean isReleased();
- public abstract void notifySubpartitionConsumed() throws IOException;
+ abstract void notifySubpartitionConsumed() throws IOException;
/**
* Releases all resources of the channel.
*/
- public abstract void releaseAllResources() throws IOException;
+ abstract void releaseAllResources() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 65f2627..8ea6407 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -53,7 +53,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
private volatile Buffer lookAhead;
- public LocalInputChannel(
+ LocalInputChannel(
SingleInputGate gate,
int channelIndex,
ResultPartitionID partitionId,
@@ -71,7 +71,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
// ------------------------------------------------------------------------
@Override
- public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+ void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
if (queueIterator == null) {
LOG.debug("Requesting LOCAL queue {} of partition {}.", subpartitionIndex, partitionId);
@@ -87,7 +87,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
}
@Override
- public Buffer getNextBuffer() throws IOException, InterruptedException {
+ Buffer getNextBuffer() throws IOException, InterruptedException {
checkState(queueIterator != null, "Queried for a buffer before requesting a queue.");
// After subscribe notification
@@ -115,7 +115,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
// ------------------------------------------------------------------------
@Override
- public void sendTaskEvent(TaskEvent event) throws IOException {
+ void sendTaskEvent(TaskEvent event) throws IOException {
checkState(queueIterator != null, "Tried to send task event to producer before requesting a queue.");
if (!taskEventDispatcher.publish(partitionId, event)) {
@@ -128,12 +128,12 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
// ------------------------------------------------------------------------
@Override
- public boolean isReleased() {
+ boolean isReleased() {
return isReleased;
}
@Override
- public void notifySubpartitionConsumed() throws IOException {
+ void notifySubpartitionConsumed() throws IOException {
if (queueIterator != null) {
queueIterator.notifySubpartitionConsumed();
}
@@ -144,7 +144,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
* iterator.
*/
@Override
- public void releaseAllResources() throws IOException {
+ void releaseAllResources() throws IOException {
if (!isReleased) {
if (lookAhead != null) {
lookAhead.recycle();
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index cae7837..df653a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -32,7 +32,6 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -44,37 +43,51 @@ public class RemoteInputChannel extends InputChannel {
private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class);
- private final InputChannelID id;
+ /** ID to distinguish this channel from other channels sharing the same TCP connection. */
+ private final InputChannelID id = new InputChannelID();
- private final ConnectionID producerAddress;
+ /** The connection to use to request the remote partition. */
+ private final ConnectionID connectionId;
- private final Queue<Buffer> receivedBuffers = new ArrayDeque<Buffer>();
+ /** The connection manager to use connect to the remote partition provider. */
+ private final ConnectionManager connectionManager;
- private final AtomicReference<IOException> ioError = new AtomicReference<IOException>();
+ /**
+ * The received buffers. Received buffers are enqueued by the network I/O thread and the queue
+ * is consumed by the receiving task thread.
+ */
+ private final Queue<Buffer> receivedBuffers = new ArrayDeque<Buffer>();
+ /**
+ * Flag indicating whether this channel has been released. Either called by the receiving task
+ * thread or the task manager actor.
+ */
private final AtomicBoolean isReleased = new AtomicBoolean();
+ /** Client to establish a (possibly shared) TCP connection and request the partition. */
private PartitionRequestClient partitionRequestClient;
+ /**
+ * The next expected sequence number for the next buffer. This is modified by the network
+ * I/O thread only.
+ */
private int expectedSequenceNumber = 0;
- private ConnectionManager connectionManager;
+ /**
+ * An error possibly set by the network I/O thread.
+ */
+ private volatile Throwable error;
- public RemoteInputChannel(
- SingleInputGate gate,
+ RemoteInputChannel(
+ SingleInputGate inputGate,
int channelIndex,
ResultPartitionID partitionId,
- ConnectionID producerAddress,
+ ConnectionID connectionId,
ConnectionManager connectionManager) {
- super(gate, channelIndex, partitionId);
+ super(inputGate, channelIndex, partitionId);
- /**
- * This ID is used by the {@link PartitionRequestClient} to distinguish
- * between receivers, which share the same TCP connection.
- */
- this.id = new InputChannelID();
- this.producerAddress = checkNotNull(producerAddress);
+ this.connectionId = checkNotNull(connectionId);
this.connectionManager = checkNotNull(connectionManager);
}
@@ -83,22 +96,25 @@ public class RemoteInputChannel extends InputChannel {
// ------------------------------------------------------------------------
@Override
- public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+ void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
if (partitionRequestClient == null) {
- LOG.debug("Requesting REMOTE queue {} from of partition {}.", subpartitionIndex, partitionId);
+ LOG.debug("{}: Requesting REMOTE subpartition {} of partition {}.",
+ this, subpartitionIndex, partitionId);
- partitionRequestClient = connectionManager.createPartitionRequestClient(producerAddress);
+ // Create a client and request the partition
+ partitionRequestClient = connectionManager
+ .createPartitionRequestClient(connectionId);
- partitionRequestClient.requestIntermediateResultPartition(partitionId, subpartitionIndex, this);
+ partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this);
}
}
@Override
- public Buffer getNextBuffer() throws IOException {
+ Buffer getNextBuffer() throws IOException {
checkState(!isReleased.get(), "Queried for a buffer after channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");
- checkIoError();
+ checkError();
synchronized (receivedBuffers) {
Buffer buffer = receivedBuffers.poll();
@@ -117,11 +133,11 @@ public class RemoteInputChannel extends InputChannel {
// ------------------------------------------------------------------------
@Override
- public void sendTaskEvent(TaskEvent event) throws IOException {
+ void sendTaskEvent(TaskEvent event) throws IOException {
checkState(!isReleased.get(), "Tried to send task event to producer after channel has been released.");
checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
- checkIoError();
+ checkError();
partitionRequestClient.sendTaskEvent(partitionId, event, this);
}
@@ -131,12 +147,12 @@ public class RemoteInputChannel extends InputChannel {
// ------------------------------------------------------------------------
@Override
- public boolean isReleased() {
+ boolean isReleased() {
return isReleased.get();
}
@Override
- public void notifySubpartitionConsumed() {
+ void notifySubpartitionConsumed() {
// Nothing to do
}
@@ -144,7 +160,7 @@ public class RemoteInputChannel extends InputChannel {
* Releases all received buffers and closes the partition request client.
*/
@Override
- public void releaseAllResources() throws IOException {
+ void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
synchronized (receivedBuffers) {
Buffer buffer;
@@ -155,21 +171,28 @@ public class RemoteInputChannel extends InputChannel {
if (partitionRequestClient != null) {
partitionRequestClient.close(this);
- } else {
- connectionManager.closeOpenChannelConnections(producerAddress);
+ }
+ else {
+ connectionManager.closeOpenChannelConnections(connectionId);
}
}
}
@Override
public String toString() {
- return "RemoteInputChannel [" + partitionId + " at " + producerAddress + "]";
+ return "RemoteInputChannel [" + partitionId + " at " + connectionId + "]";
}
// ------------------------------------------------------------------------
// Network I/O notifications (called by network I/O thread)
// ------------------------------------------------------------------------
+ public int getNumberOfQueuedBuffers() {
+ synchronized (receivedBuffers) {
+ return receivedBuffers.size();
+ }
+ }
+
public InputChannelID getInputChannelId() {
return id;
}
@@ -186,8 +209,8 @@ public class RemoteInputChannel extends InputChannel {
boolean success = false;
try {
- if (!isReleased.get()) {
- synchronized (receivedBuffers) {
+ synchronized (receivedBuffers) {
+ if (!isReleased.get()) {
if (expectedSequenceNumber == sequenceNumber) {
receivedBuffers.add(buffer);
expectedSequenceNumber++;
@@ -195,13 +218,11 @@ public class RemoteInputChannel extends InputChannel {
notifyAvailableBuffer();
success = true;
-
- return;
+ }
+ else {
+ onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
}
}
-
- IOException error = new BufferReorderingException(expectedSequenceNumber, sequenceNumber);
- ioError.compareAndSet(null, error);
}
}
finally {
@@ -212,33 +233,35 @@ public class RemoteInputChannel extends InputChannel {
}
public void onEmptyBuffer(int sequenceNumber) {
- if (!isReleased.get()) {
- synchronized (receivedBuffers) {
+ synchronized (receivedBuffers) {
+ if (!isReleased.get()) {
if (expectedSequenceNumber == sequenceNumber) {
expectedSequenceNumber++;
}
else {
- IOException error = new BufferReorderingException(expectedSequenceNumber, sequenceNumber);
- ioError.compareAndSet(null, error);
+ onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
}
}
}
}
- public void onError(Throwable error) {
- if (ioError.compareAndSet(null, error instanceof IOException ? (IOException) error : new IOException(error))) {
+ public void onError(Throwable cause) {
+ if (error == null) {
+ error = cause;
+
+ // Notify the input gate to trigger querying of this channel
notifyAvailableBuffer();
}
}
- // ------------------------------------------------------------------------
-
- private void checkIoError() throws IOException {
- IOException error = ioError.get();
+ /**
+ * Checks whether this channel got notified by the network I/O thread about an error.
+ */
+ private void checkError() throws IOException {
+ final Throwable t = error;
- if (error != null) {
- throw new IOException(String.format("%s at remote input channel: %s].",
- error.getClass().getName(), error.getMessage()));
+ if (t != null) {
+ throw new IOException(t);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
index 7d3dbbe..e1f8fd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api.reader;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.util.TestSingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.util.event.EventListener;
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java
deleted file mode 100644
index 3968eda..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.util.TestInputChannel;
-import org.apache.flink.runtime.io.network.util.TestSingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.MutableObjectIterator;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
-
- private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
-
- private final int bufferSize;
-
- private MutableObjectIterator<T> inputIterator;
-
- private RecordSerializer<T> serializer;
-
- private final T reuse;
-
- public IteratorWrappingTestSingleInputGate(int bufferSize, Class<T> recordType, MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
- super(1, false);
-
- this.bufferSize = bufferSize;
- this.reuse = InstantiationUtil.instantiate(recordType);
-
- wrapIterator(iterator);
- }
-
- private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
- inputIterator = iterator;
- serializer = new SpanningRecordSerializer<T>();
-
- // The input iterator can produce an infinite stream. That's why we have to serialize each
- // record on demand and cannot do it upfront.
- final Answer<Buffer> answer = new Answer<Buffer>() {
- @Override
- public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
- if (inputIterator.next(reuse) != null) {
- final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class));
- serializer.setNextBuffer(buffer);
- serializer.addRecord(reuse);
-
- inputGate.onAvailableBuffer(inputChannel.getInputChannel());
-
- // Call getCurrentBuffer to ensure size is set
- return serializer.getCurrentBuffer();
- }
- else {
-
- when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
-
- return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
- }
- }
- };
-
- when(inputChannel.getInputChannel().getNextBuffer()).thenAnswer(answer);
-
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel.getInputChannel());
-
- return this;
- }
-
- public IteratorWrappingTestSingleInputGate<T> read() {
- inputGate.onAvailableBuffer(inputChannel.getInputChannel());
-
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
new file mode 100644
index 0000000..65780b4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MutableObjectIterator;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
+
+ private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
+
+ private final int bufferSize;
+
+ private MutableObjectIterator<T> inputIterator;
+
+ private RecordSerializer<T> serializer;
+
+ private final T reuse;
+
+ public IteratorWrappingTestSingleInputGate(int bufferSize, Class<T> recordType, MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
+ super(1, false);
+
+ this.bufferSize = bufferSize;
+ this.reuse = InstantiationUtil.instantiate(recordType);
+
+ wrapIterator(iterator);
+ }
+
+ private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
+ inputIterator = iterator;
+ serializer = new SpanningRecordSerializer<T>();
+
+ // The input iterator can produce an infinite stream. That's why we have to serialize each
+ // record on demand and cannot do it upfront.
+ final Answer<Buffer> answer = new Answer<Buffer>() {
+ @Override
+ public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (inputIterator.next(reuse) != null) {
+ final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class));
+ serializer.setNextBuffer(buffer);
+ serializer.addRecord(reuse);
+
+ inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+
+ // Call getCurrentBuffer to ensure size is set
+ return serializer.getCurrentBuffer();
+ }
+ else {
+
+ when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
+
+ return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+ }
+ }
+ };
+
+ when(inputChannel.getInputChannel().getNextBuffer()).thenAnswer(answer);
+
+ inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel.getInputChannel());
+
+ return this;
+ }
+
+ public IteratorWrappingTestSingleInputGate<T> read() {
+ inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
new file mode 100644
index 0000000..7eb728c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteInputChannelTest {
+
+ @Test
+ public void testExceptionOnReordering() throws Exception {
+ // Setup
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
+ final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
+
+ // The test
+ inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 0);
+
+ // This does not yet throw the exception, but sets the error at the channel.
+ inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 29);
+
+ try {
+ inputChannel.getNextBuffer();
+
+ fail("Did not throw expected exception after enqueuing an out-of-order buffer.");
+ }
+ catch (Exception expected) {
+ }
+
+ // Need to notify the input gate for the out-of-order buffer as well. Otherwise the
+ // receiving task will not notice the error.
+ verify(inputGate, times(2)).onAvailableBuffer(eq(inputChannel));
+ }
+
+ @Test(timeout = 120 * 1000)
+ public void testConcurrentOnBufferAndRelease() throws Exception {
+ // Config
+ // Repeatedly spawn two tasks: one to queue buffers and the other to release the channel
+ // concurrently. We do this repeatedly to provoke races.
+ final int numberOfRepetitions = 8192;
+
+ // Setup
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ try {
+ // Test
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
+
+ for (int i = 0; i < numberOfRepetitions; i++) {
+ final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
+
+ final Callable<Void> enqueueTask = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ while (true) {
+ for (int j = 0; j < 128; j++) {
+ inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), j);
+ }
+
+ if (inputChannel.isReleased()) {
+ return null;
+ }
+ }
+ }
+ };
+
+ final Callable<Void> releaseTask = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ inputChannel.releaseAllResources();
+
+ return null;
+ }
+ };
+
+ // Submit tasks and wait to finish
+ List<Future<Void>> results = Lists.newArrayListWithCapacity(2);
+
+ results.add(executor.submit(enqueueTask));
+ results.add(executor.submit(releaseTask));
+
+ for (Future<Void> result : results) {
+ result.get();
+ }
+
+ assertEquals("Resource leak during concurrent release and enqueue.",
+ 0, inputChannel.getNumberOfQueuedBuffers());
+ }
+ }
+ finally {
+ executor.shutdown();
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate)
+ throws IOException, InterruptedException {
+
+ final ConnectionManager connectionManager = mock(ConnectionManager.class);
+ when(connectionManager.createPartitionRequestClient(any(ConnectionID.class)))
+ .thenReturn(mock(PartitionRequestClient.class));
+
+ return new RemoteInputChannel(
+ inputGate,
+ 0,
+ new ResultPartitionID(),
+ mock(ConnectionID.class),
+ connectionManager);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index e1e3cff..5871514 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
-import org.apache.flink.runtime.io.network.util.TestInputChannel;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
new file mode 100644
index 0000000..3dafffd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * A mocked input channel.
+ */
+public class TestInputChannel {
+
+ private final InputChannel mock = Mockito.mock(InputChannel.class);
+
+ private final SingleInputGate inputGate;
+
+ // Abusing Mockito here... ;)
+ protected OngoingStubbing<Buffer> stubbing;
+
+ public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
+ checkArgument(channelIndex >= 0);
+ this.inputGate = checkNotNull(inputGate);
+
+ when(mock.getChannelIndex()).thenReturn(channelIndex);
+ }
+
+ public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
+ if (stubbing == null) {
+ stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
+ }
+ else {
+ stubbing = stubbing.thenReturn(buffer);
+ }
+
+ inputGate.onAvailableBuffer(mock);
+
+ return this;
+ }
+
+ public TestInputChannel readBuffer() throws IOException, InterruptedException {
+ final Buffer buffer = mock(Buffer.class);
+ when(buffer.isBuffer()).thenReturn(true);
+
+ return read(buffer);
+ }
+
+ public TestInputChannel readEvent() throws IOException, InterruptedException {
+ return read(EventSerializer.toBuffer(new TestTaskEvent()));
+ }
+
+ public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException {
+ return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
+ }
+
+ public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
+ final Answer<Buffer> answer = new Answer<Buffer>() {
+ @Override
+ public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+ // Return true after finishing
+ when(mock.isReleased()).thenReturn(true);
+
+ return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+ }
+ };
+
+ if (stubbing == null) {
+ stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
+ }
+ else {
+ stubbing = stubbing.thenAnswer(answer);
+ }
+
+ inputGate.onAvailableBuffer(mock);
+
+ return this;
+ }
+
+ public InputChannel getInputChannel() {
+ return mock;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates test input channels and attaches them to the specified input gate.
+ *
+ * @return The created test input channels.
+ */
+ public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
+ checkNotNull(inputGate);
+ checkArgument(numberOfInputChannels > 0);
+
+ TestInputChannel[] mocks = new TestInputChannel[numberOfInputChannels];
+
+ for (int i = 0; i < numberOfInputChannels; i++) {
+ mocks[i] = new TestInputChannel(inputGate, i);
+
+ inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
+ }
+
+ return mocks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
new file mode 100644
index 0000000..5033b3d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.mockito.Mockito.spy;
+
+/**
+ * A test input gate to mock reading data.
+ */
+public class TestSingleInputGate {
+
+ protected final SingleInputGate inputGate;
+
+ protected final TestInputChannel[] inputChannels;
+
+ public TestSingleInputGate(int numberOfInputChannels) {
+ this(numberOfInputChannels, true);
+ }
+
+ public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
+ checkArgument(numberOfInputChannels >= 1);
+
+ this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
+
+ this.inputChannels = new TestInputChannel[numberOfInputChannels];
+
+ if (initialize) {
+ for (int i = 0; i < numberOfInputChannels; i++) {
+ inputChannels[i] = new TestInputChannel(inputGate, i);
+ inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
+ }
+ }
+ }
+
+ public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException {
+ checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
+
+ inputChannels[channelIndex].read(buffer);
+
+ return this;
+ }
+
+ public TestSingleInputGate readBuffer() throws IOException, InterruptedException {
+ return readBuffer(0);
+ }
+
+ public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException {
+ inputChannels[channelIndex].readBuffer();
+
+ return this;
+ }
+
+ public TestSingleInputGate readEvent() throws IOException, InterruptedException {
+ return readEvent(0);
+ }
+
+ public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException {
+ inputChannels[channelIndex].readEvent();
+
+ return this;
+ }
+
+ public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException {
+ for (TestInputChannel inputChannel : inputChannels) {
+ inputChannel.readEndOfSuperstepEvent();
+ }
+
+ return this;
+ }
+
+ public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException {
+ inputChannels[channelIndex].readEndOfSuperstepEvent();
+
+ return this;
+ }
+
+ public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException {
+ for (TestInputChannel inputChannel : inputChannels) {
+ inputChannel.readEndOfPartitionEvent();
+ }
+
+ return this;
+ }
+
+ public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException {
+ inputChannels[channelIndex].readEndOfPartitionEvent();
+
+ return this;
+ }
+
+ public SingleInputGate getInputGate() {
+ return inputGate;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public List<Integer> readAllChannels() throws IOException, InterruptedException {
+ final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
+
+ for (int i = 0; i < inputChannels.length; i++) {
+ readOrder.add(i);
+ }
+
+ Collections.shuffle(readOrder);
+
+ for (int channelIndex : readOrder) {
+ inputChannels[channelIndex].readBuffer();
+ }
+
+ return readOrder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index c7cb413..18e56ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.io.network.util.TestInputChannel;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index d10bf0c..0ff42b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -34,6 +34,8 @@ public class TestBufferFactory {
private static final BufferRecycler discardingRecycler = new DiscardingRecycler();
+ private static final Buffer mockBuffer = createBuffer();
+
private final int bufferSize;
private final BufferRecycler bufferRecycler;
@@ -85,4 +87,8 @@ public class TestBufferFactory {
return new Buffer(new MemorySegment(new byte[bufferSize]), discardingRecycler);
}
+
+ public static Buffer getMockBuffer() {
+ return mockBuffer;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
deleted file mode 100644
index 0e9e8e7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * A mocked input channel.
- */
-public class TestInputChannel {
-
- private final InputChannel mock = Mockito.mock(InputChannel.class);
-
- private final SingleInputGate inputGate;
-
- // Abusing Mockito here... ;)
- protected OngoingStubbing<Buffer> stubbing;
-
- public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
- checkArgument(channelIndex >= 0);
- this.inputGate = checkNotNull(inputGate);
-
- when(mock.getChannelIndex()).thenReturn(channelIndex);
- }
-
- public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
- if (stubbing == null) {
- stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
- }
- else {
- stubbing = stubbing.thenReturn(buffer);
- }
-
- inputGate.onAvailableBuffer(mock);
-
- return this;
- }
-
- public TestInputChannel readBuffer() throws IOException, InterruptedException {
- final Buffer buffer = mock(Buffer.class);
- when(buffer.isBuffer()).thenReturn(true);
-
- return read(buffer);
- }
-
- public TestInputChannel readEvent() throws IOException, InterruptedException {
- return read(EventSerializer.toBuffer(new TestTaskEvent()));
- }
-
- public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException {
- return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
- }
-
- public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
- final Answer<Buffer> answer = new Answer<Buffer>() {
- @Override
- public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
- // Return true after finishing
- when(mock.isReleased()).thenReturn(true);
-
- return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
- }
- };
-
- if (stubbing == null) {
- stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
- }
- else {
- stubbing = stubbing.thenAnswer(answer);
- }
-
- inputGate.onAvailableBuffer(mock);
-
- return this;
- }
-
- public InputChannel getInputChannel() {
- return mock;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates test input channels and attaches them to the specified input gate.
- *
- * @return The created test input channels.
- */
- public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
- checkNotNull(inputGate);
- checkArgument(numberOfInputChannels > 0);
-
- TestInputChannel[] mocks = new TestInputChannel[numberOfInputChannels];
-
- for (int i = 0; i < numberOfInputChannels; i++) {
- mocks[i] = new TestInputChannel(inputGate, i);
-
- inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
- }
-
- return mocks;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
deleted file mode 100644
index d10e1a0..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkElementIndex;
-import static org.mockito.Mockito.spy;
-
-/**
- * A test input gate to mock reading data.
- */
-public class TestSingleInputGate {
-
- protected final SingleInputGate inputGate;
-
- protected final TestInputChannel[] inputChannels;
-
- public TestSingleInputGate(int numberOfInputChannels) {
- this(numberOfInputChannels, true);
- }
-
- public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
- checkArgument(numberOfInputChannels >= 1);
-
- this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
-
- this.inputChannels = new TestInputChannel[numberOfInputChannels];
-
- if (initialize) {
- for (int i = 0; i < numberOfInputChannels; i++) {
- inputChannels[i] = new TestInputChannel(inputGate, i);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
- }
- }
- }
-
- public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException {
- checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
-
- inputChannels[channelIndex].read(buffer);
-
- return this;
- }
-
- public TestSingleInputGate readBuffer() throws IOException, InterruptedException {
- return readBuffer(0);
- }
-
- public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException {
- inputChannels[channelIndex].readBuffer();
-
- return this;
- }
-
- public TestSingleInputGate readEvent() throws IOException, InterruptedException {
- return readEvent(0);
- }
-
- public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException {
- inputChannels[channelIndex].readEvent();
-
- return this;
- }
-
- public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException {
- for (TestInputChannel inputChannel : inputChannels) {
- inputChannel.readEndOfSuperstepEvent();
- }
-
- return this;
- }
-
- public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException {
- inputChannels[channelIndex].readEndOfSuperstepEvent();
-
- return this;
- }
-
- public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException {
- for (TestInputChannel inputChannel : inputChannels) {
- inputChannel.readEndOfPartitionEvent();
- }
-
- return this;
- }
-
- public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException {
- inputChannels[channelIndex].readEndOfPartitionEvent();
-
- return this;
- }
-
- public SingleInputGate getInputGate() {
- return inputGate;
- }
-
- // ------------------------------------------------------------------------
-
- public List<Integer> readAllChannels() throws IOException, InterruptedException {
- final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
-
- for (int i = 0; i < inputChannels.length; i++) {
- readOrder.add(i);
- }
-
- Collections.shuffle(readOrder);
-
- for (int channelIndex : readOrder) {
- inputChannels[channelIndex].readBuffer();
- }
-
- return readOrder;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 63ce5e2..e91d338 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 6625bbc..d5c9fbb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index b93d37e..6ffc97b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.record.io.FileOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.PactDriver;