You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/18 17:48:53 UTC
[04/13] flink git commit: [FLINK-1350] [runtime] Add blocking result
partition variant
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java
deleted file mode 100644
index 62375a6..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class MockConsumer implements Callable<Boolean> {
-
- private static final int SLEEP_TIME_MS = 20;
-
- private final IntermediateResultPartitionQueueIterator iterator;
-
- private final boolean slowConsumer;
-
- private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
- public MockConsumer(IntermediateResultPartitionQueueIterator iterator, boolean slowConsumer) {
- this.iterator = iterator;
- this.slowConsumer = slowConsumer;
- }
-
- @Override
- public Boolean call() throws Exception {
- MockNotificationListener listener = new MockNotificationListener();
-
- int currentNumber = 0;
-
- try {
- while (true) {
- Buffer buffer = iterator.getNextBuffer();
-
- if (slowConsumer) {
- Thread.sleep(SLEEP_TIME_MS);
- }
-
- if (buffer == null) {
- if (iterator.subscribe(listener)) {
- listener.waitForNotification();
- }
- else if (iterator.isConsumed()) {
- break;
- }
- }
- else {
- try {
- if (buffer.isBuffer()) {
- currentNumber = verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
- }
- }
- finally {
- buffer.recycle();
- }
- }
- }
- }
- catch (Throwable t) {
- error.compareAndSet(null, t);
- return false;
- }
-
- return true;
- }
-
- public Throwable getError() {
- return error.get();
- }
-
- private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber) {
- MemorySegment segment = buffer.getMemorySegment();
-
- for (int i = 4; i < segment.size(); i += 4) {
- if (segment.getInt(i) != currentNumber++) {
- throw new IllegalStateException("Read unexpected number from buffer.");
- }
- }
-
- return currentNumber;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java
deleted file mode 100644
index 301169a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * A mocked input channel.
- */
-public class MockInputChannel {
-
- private final InputChannel mock = Mockito.mock(InputChannel.class);
-
- private final SingleInputGate inputGate;
-
- // Abusing Mockito here... ;)
- protected OngoingStubbing<Buffer> stubbing;
-
- public MockInputChannel(SingleInputGate inputGate, int channelIndex) {
- checkArgument(channelIndex >= 0);
- this.inputGate = checkNotNull(inputGate);
-
- when(mock.getChannelIndex()).thenReturn(channelIndex);
- }
-
- public MockInputChannel read(Buffer buffer) throws IOException {
- if (stubbing == null) {
- stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
- }
- else {
- stubbing = stubbing.thenReturn(buffer);
- }
-
- inputGate.onAvailableBuffer(mock);
-
- return this;
- }
-
- public MockInputChannel readBuffer() throws IOException {
- final Buffer buffer = mock(Buffer.class);
- when(buffer.isBuffer()).thenReturn(true);
-
- return read(buffer);
- }
-
- public MockInputChannel readEvent() throws IOException {
- return read(EventSerializer.toBuffer(new TestTaskEvent()));
- }
-
- public MockInputChannel readEndOfSuperstepEvent() throws IOException {
- return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
- }
-
- public MockInputChannel readEndOfPartitionEvent() throws IOException {
- final Answer<Buffer> answer = new Answer<Buffer>() {
- @Override
- public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
- // Return true after finishing
- when(mock.isReleased()).thenReturn(true);
-
- return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
- }
- };
-
- if (stubbing == null) {
- stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
- }
- else {
- stubbing = stubbing.thenAnswer(answer);
- }
-
- inputGate.onAvailableBuffer(mock);
-
- return this;
- }
-
- public InputChannel getInputChannel() {
- return mock;
- }
-
- // ------------------------------------------------------------------------
-
- public static MockInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
- checkNotNull(inputGate);
- checkArgument(numberOfInputChannels > 0);
-
- MockInputChannel[] mocks = new MockInputChannel[numberOfInputChannels];
-
- for (int i = 0; i < numberOfInputChannels; i++) {
- mocks[i] = new MockInputChannel(inputGate, i);
-
- inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
- }
-
- return mocks;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java
deleted file mode 100644
index 56e0025..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class MockNotificationListener implements NotificationListener {
-
- final AtomicInteger numNotifications = new AtomicInteger();
-
- @Override
- public void onNotification() {
- synchronized (numNotifications) {
- numNotifications.incrementAndGet();
-
- numNotifications.notifyAll();
- }
- }
-
- public void waitForNotification() throws InterruptedException {
-
- int current = numNotifications.get();
-
- synchronized (numNotifications) {
- while (current == numNotifications.get()) {
- numNotifications.wait();
- }
- }
- }
-
- public int getNumberOfNotifications() {
- return numNotifications.get();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java
deleted file mode 100644
index 44d8ffe..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueue;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class MockProducer implements Callable<Boolean> {
-
- private static final int SLEEP_TIME_MS = 20;
-
- private final IntermediateResultPartitionQueue queue;
-
- private final BufferPool bufferPool;
-
- private final int numBuffersToProduce;
-
- private final boolean slowProducer;
-
- private final AtomicInteger discardAfter = new AtomicInteger(Integer.MAX_VALUE);
-
- private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
- public MockProducer(IntermediateResultPartitionQueue queue, BufferPool bufferPool, int numBuffersToProduce, boolean slowProducer) {
- this.queue = queue;
- this.bufferPool = bufferPool;
- this.numBuffersToProduce = numBuffersToProduce;
- this.slowProducer = slowProducer;
- }
-
- @Override
- public Boolean call() throws Exception {
- try {
- int currentNumber = 0;
-
- for (int i = 0; i < numBuffersToProduce; i++) {
- if (i >= discardAfter.get()) {
- queue.discard();
- return true;
- }
-
- Buffer buffer = bufferPool.requestBufferBlocking();
-
- currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
-
- queue.add(buffer);
-
- if (slowProducer) {
- Thread.sleep(SLEEP_TIME_MS);
- }
- }
-
- queue.finish();
- }
- catch (Throwable t) {
- error.compareAndSet(null, t);
- return false;
- }
-
- return true;
- }
-
- void discard() {
- discardAfter.set(0);
- }
-
- public void discardAfter(int numBuffers) {
- discardAfter.set(numBuffers);
- }
-
- public Throwable getError() {
- return error.get();
- }
-
- public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
- MemorySegment segment = buffer.getMemorySegment();
-
- for (int i = 4; i < segment.size(); i += 4) {
- segment.putInt(i, currentNumber++);
- }
-
- return currentNumber;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java
deleted file mode 100644
index 3c708ac..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkElementIndex;
-import static org.mockito.Mockito.spy;
-
-public class MockSingleInputGate {
-
- protected final SingleInputGate inputGate;
-
- protected final MockInputChannel[] inputChannels;
-
- public MockSingleInputGate(int numberOfInputChannels) {
- this(numberOfInputChannels, true);
- }
-
- public MockSingleInputGate(int numberOfInputChannels, boolean initialize) {
- checkArgument(numberOfInputChannels >= 1);
-
- this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
-
- this.inputChannels = new MockInputChannel[numberOfInputChannels];
-
- if (initialize) {
- for (int i = 0; i < numberOfInputChannels; i++) {
- inputChannels[i] = new MockInputChannel(inputGate, i);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
- }
- }
- }
-
- public MockSingleInputGate read(Buffer buffer, int channelIndex) throws IOException {
- checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
-
- inputChannels[channelIndex].read(buffer);
-
- return this;
- }
-
- public MockSingleInputGate readBuffer() throws IOException {
- return readBuffer(0);
- }
-
- public MockSingleInputGate readBuffer(int channelIndex) throws IOException {
- inputChannels[channelIndex].readBuffer();
-
- return this;
- }
-
- public MockSingleInputGate readEvent() throws IOException {
- return readEvent(0);
- }
-
- public MockSingleInputGate readEvent(int channelIndex) throws IOException {
- inputChannels[channelIndex].readEvent();
-
- return this;
- }
-
- public MockSingleInputGate readEndOfSuperstepEvent() throws IOException {
- for (MockInputChannel inputChannel : inputChannels) {
- inputChannel.readEndOfSuperstepEvent();
- }
-
- return this;
- }
-
- public MockSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException {
- inputChannels[channelIndex].readEndOfSuperstepEvent();
-
- return this;
- }
-
- public MockSingleInputGate readEndOfPartitionEvent() throws IOException {
- for (MockInputChannel inputChannel : inputChannels) {
- inputChannel.readEndOfPartitionEvent();
- }
-
- return this;
- }
-
- public MockSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException {
- inputChannels[channelIndex].readEndOfPartitionEvent();
-
- return this;
- }
-
- public SingleInputGate getInputGate() {
- return inputGate;
- }
-
- // ------------------------------------------------------------------------
-
- public List<Integer> readAllChannels() throws IOException {
- final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
-
- for (int i = 0; i < inputChannels.length; i++) {
- readOrder.add(i);
- }
-
- Collections.shuffle(readOrder);
-
- for (int channelIndex : readOrder) {
- inputChannels[channelIndex].readBuffer();
- }
-
- return readOrder;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
new file mode 100644
index 0000000..d10bf0c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class TestBufferFactory {
+
+ private static final int defaultSize = 32 * 1024;
+
+ private static final BufferRecycler discardingRecycler = new DiscardingRecycler();
+
+ private final int bufferSize;
+
+ private final BufferRecycler bufferRecycler;
+
+ private AtomicInteger numberOfCreatedBuffers = new AtomicInteger();
+
+ public TestBufferFactory() {
+ this(defaultSize, discardingRecycler);
+ }
+
+ public TestBufferFactory(int bufferSize) {
+ this(bufferSize, discardingRecycler);
+ }
+
+ public TestBufferFactory(int bufferSize, BufferRecycler bufferRecycler) {
+ checkArgument(bufferSize > 0);
+ this.bufferSize = bufferSize;
+ this.bufferRecycler = checkNotNull(bufferRecycler);
+ }
+
+ public Buffer create() {
+ numberOfCreatedBuffers.incrementAndGet();
+
+ return new Buffer(new MemorySegment(new byte[bufferSize]), bufferRecycler);
+ }
+
+ public Buffer createFrom(MemorySegment segment) {
+ return new Buffer(segment, bufferRecycler);
+ }
+
+ public int getNumberOfCreatedBuffers() {
+ return numberOfCreatedBuffers.get();
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ // ------------------------------------------------------------------------
+ // Static test helpers
+ // ------------------------------------------------------------------------
+
+ public static Buffer createBuffer() {
+ return createBuffer(defaultSize);
+ }
+
+ public static Buffer createBuffer(int bufferSize) {
+ checkArgument(bufferSize > 0);
+
+ return new Buffer(new MemorySegment(new byte[bufferSize]), discardingRecycler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
new file mode 100644
index 0000000..52083c4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+public interface TestConsumerCallback {
+
+ void onBuffer(Buffer buffer);
+
+ void onEvent(AbstractEvent event);
+
+ public static class CountingCallback implements TestConsumerCallback {
+
+ private final AtomicInteger numberOfReadBuffers = new AtomicInteger();
+
+ private final AtomicInteger numberOfReadEvents = new AtomicInteger();
+
+ @Override
+ public void onBuffer(Buffer buffer) {
+ numberOfReadBuffers.incrementAndGet();
+ }
+
+ @Override
+ public void onEvent(AbstractEvent event) {
+ numberOfReadEvents.incrementAndGet();
+ }
+
+ /**
+ * Returns the number of read buffers.
+ */
+ public int getNumberOfReadBuffers() {
+ return numberOfReadBuffers.get();
+ }
+
+ /**
+ * Returns the number of read events;
+ */
+ public int getNumberOfReadEvents() {
+ return numberOfReadEvents.get();
+ }
+ }
+
+ public static class RecyclingCallback extends CountingCallback {
+
+ @Override
+ public void onBuffer(Buffer buffer) {
+ super.onBuffer(buffer);
+
+ buffer.recycle();
+ }
+
+ @Override
+ public void onEvent(AbstractEvent event) {
+ super.onEvent(event);
+ }
+ }
+
+ public class VerifyAscendingCallback extends RecyclingCallback {
+
+ @Override
+ public void onBuffer(Buffer buffer) {
+ final MemorySegment segment = buffer.getMemorySegment();
+
+ int expected = getNumberOfReadBuffers() * (segment.size() / 4);
+
+ for (int i = 0; i < segment.size(); i += 4) {
+ assertEquals(expected, segment.getInt(i));
+
+ expected++;
+ }
+
+ super.onBuffer(buffer);
+ }
+
+ @Override
+ public void onEvent(AbstractEvent event) {
+ super.onEvent(event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
new file mode 100644
index 0000000..976e63d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class TestInfiniteBufferProvider implements BufferProvider {
+
+ private final ConcurrentLinkedQueue<Buffer> buffers = new ConcurrentLinkedQueue<Buffer>();
+
+ private final TestBufferFactory bufferFactory = new TestBufferFactory(
+ 32 * 1024, new InfiniteBufferProviderRecycler(buffers));
+
+ @Override
+ public Buffer requestBuffer() throws IOException {
+ Buffer buffer = buffers.poll();
+
+ if (buffer != null) {
+ return buffer;
+ }
+
+ return bufferFactory.create();
+ }
+
+ @Override
+ public Buffer requestBufferBlocking() throws IOException, InterruptedException {
+ return requestBuffer();
+ }
+
+ @Override
+ public boolean addListener(EventListener<Buffer> listener) {
+ return false;
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return false;
+ }
+
+ @Override
+ public int getMemorySegmentSize() {
+ return bufferFactory.getBufferSize();
+ }
+
+ private static class InfiniteBufferProviderRecycler implements BufferRecycler {
+
+ private final ConcurrentLinkedQueue<Buffer> buffers;
+
+ public InfiniteBufferProviderRecycler(ConcurrentLinkedQueue<Buffer> buffers) {
+ this.buffers = buffers;
+ }
+
+ @Override
+ public void recycle(MemorySegment segment) {
+ buffers.add(new Buffer(segment, this));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
new file mode 100644
index 0000000..306de4c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * A mocked input channel.
+ */
+public class TestInputChannel {
+
+ private final InputChannel mock = Mockito.mock(InputChannel.class);
+
+ private final SingleInputGate inputGate;
+
+ // Abusing Mockito here... ;)
+ protected OngoingStubbing<Buffer> stubbing;
+
+ public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
+ checkArgument(channelIndex >= 0);
+ this.inputGate = checkNotNull(inputGate);
+
+ when(mock.getChannelIndex()).thenReturn(channelIndex);
+ }
+
+ public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
+ if (stubbing == null) {
+ stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
+ }
+ else {
+ stubbing = stubbing.thenReturn(buffer);
+ }
+
+ inputGate.onAvailableBuffer(mock);
+
+ return this;
+ }
+
+ public TestInputChannel readBuffer() throws IOException, InterruptedException {
+ final Buffer buffer = mock(Buffer.class);
+ when(buffer.isBuffer()).thenReturn(true);
+
+ return read(buffer);
+ }
+
+ public TestInputChannel readEvent() throws IOException, InterruptedException {
+ return read(EventSerializer.toBuffer(new TestTaskEvent()));
+ }
+
+ public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException {
+ return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
+ }
+
+ public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
+ final Answer<Buffer> answer = new Answer<Buffer>() {
+ @Override
+ public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+ // Return true after finishing
+ when(mock.isReleased()).thenReturn(true);
+
+ return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+ }
+ };
+
+ if (stubbing == null) {
+ stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
+ }
+ else {
+ stubbing = stubbing.thenAnswer(answer);
+ }
+
+ inputGate.onAvailableBuffer(mock);
+
+ return this;
+ }
+
+ public InputChannel getInputChannel() {
+ return mock;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
+ checkNotNull(inputGate);
+ checkArgument(numberOfInputChannels > 0);
+
+ TestInputChannel[] mocks = new TestInputChannel[numberOfInputChannels];
+
+ for (int i = 0; i < numberOfInputChannels; i++) {
+ mocks[i] = new TestInputChannel(inputGate, i);
+
+ inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
+ }
+
+ return mocks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestNotificationListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestNotificationListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestNotificationListener.java
new file mode 100644
index 0000000..1e943a6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestNotificationListener.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A mock notification listener.
+ */
+public class TestNotificationListener implements NotificationListener {
+
+ final AtomicInteger numberOfNotifications = new AtomicInteger();
+
+ @Override
+ public void onNotification() {
+ synchronized (numberOfNotifications) {
+ numberOfNotifications.incrementAndGet();
+
+ numberOfNotifications.notifyAll();
+ }
+ }
+
+ /**
+ * Waits on a notification.
+ *
+ * <p> <strong>Important</strong>: It's necessary to get the current number of notifications
+ * <em>before</em> registering the listener. Otherwise the wait call may block indefinitely.
+ *
+ * <pre>
+ * MockNotificationListener listener = new MockNotificationListener();
+ *
+ * int current = listener.getNumberOfNotifications();
+ *
+ * // Register the listener
+ * register(listener);
+ *
+ * listener.waitForNotification(current);
+ * </pre>
+ */
+ public void waitForNotification(int current) throws InterruptedException {
+ synchronized (numberOfNotifications) {
+ while (current == numberOfNotifications.get()) {
+ numberOfNotifications.wait();
+ }
+ }
+ }
+
+ public int getNumberOfNotifications() {
+ return numberOfNotifications.get();
+ }
+
+ public void reset() {
+ numberOfNotifications.set(0);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
new file mode 100644
index 0000000..31fd4a4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A test partition producer.
+ *
+ * <p> The behaviour of the producer is customizable by specifying a source.
+ *
+ * @see TestProducerSource
+ */
+public class TestPartitionProducer implements Callable<Boolean> {
+
+ public static final int MAX_SLEEP_TIME_MS = 20;
+
+ /** The partition to add data to. */
+ private final ResultPartition partition;
+
+ /**
+ * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
+ * number of milliseconds between adding data.
+ */
+ private final boolean isSlowProducer;
+
+ /** The source data. */
+ private final TestProducerSource source;
+
+ /** Random source for sleeps. */
+ private final Random random;
+
+ public TestPartitionProducer(
+ ResultPartition partition,
+ boolean isSlowProducer,
+ TestProducerSource source) {
+
+ this.partition = checkNotNull(partition);
+ this.isSlowProducer = isSlowProducer;
+ this.random = isSlowProducer ? new Random() : null;
+ this.source = checkNotNull(source);
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+
+ try {
+ BufferOrEvent bufferOrEvent;
+
+ while ((bufferOrEvent = source.getNextBufferOrEvent()) != null) {
+ int targetChannelIndex = bufferOrEvent.getChannelIndex();
+
+ if (bufferOrEvent.isBuffer()) {
+ partition.add(bufferOrEvent.getBuffer(), targetChannelIndex);
+ }
+ else if (bufferOrEvent.isEvent()) {
+ final Buffer buffer = EventSerializer.toBuffer(bufferOrEvent.getEvent());
+
+ partition.add(buffer, targetChannelIndex);
+ }
+ else {
+ throw new IllegalStateException("BufferOrEvent instance w/o buffer nor event.");
+ }
+
+ // Check for interrupted flag after adding data to prevent resource leaks
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ if (isSlowProducer) {
+ Thread.sleep(random.nextInt(MAX_SLEEP_TIME_MS + 1));
+ }
+ }
+
+ partition.finish();
+
+ return true;
+ }
+ finally {
+ partition.release();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
new file mode 100644
index 0000000..4893360
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import com.google.common.collect.Queues;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class TestPooledBufferProvider implements BufferProvider {
+
+ private final Object bufferCreationLock = new Object();
+
+ private final ArrayBlockingQueue<Buffer> buffers;
+
+ private final TestBufferFactory bufferFactory;
+
+ private final PooledBufferProviderRecycler bufferRecycler;
+
+ private final int poolSize;
+
+ public TestPooledBufferProvider(int poolSize) {
+ checkArgument(poolSize > 0);
+ this.poolSize = poolSize;
+
+ this.buffers = new ArrayBlockingQueue<Buffer>(poolSize);
+ this.bufferRecycler = new PooledBufferProviderRecycler(buffers);
+ this.bufferFactory = new TestBufferFactory(32 * 1024, bufferRecycler);
+ }
+
+ @Override
+ public Buffer requestBuffer() throws IOException {
+ final Buffer buffer = buffers.poll();
+
+ if (buffer != null) {
+ return buffer;
+ }
+ else {
+ synchronized (bufferCreationLock) {
+ if (bufferFactory.getNumberOfCreatedBuffers() < poolSize) {
+ return bufferFactory.create();
+ }
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ public Buffer requestBufferBlocking() throws IOException, InterruptedException {
+ final Buffer buffer = buffers.poll();
+
+ if (buffer != null) {
+ return buffer;
+ }
+ else {
+ synchronized (bufferCreationLock) {
+ if (bufferFactory.getNumberOfCreatedBuffers() < poolSize) {
+ return bufferFactory.create();
+ }
+ }
+
+ return buffers.take();
+ }
+ }
+
+ @Override
+ public boolean addListener(EventListener<Buffer> listener) {
+ return bufferRecycler.registerListener(listener);
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return false;
+ }
+
+ @Override
+ public int getMemorySegmentSize() {
+ return bufferFactory.getBufferSize();
+ }
+
+ public int getNumberOfAvailableBuffers() {
+ return buffers.size();
+ }
+
+ private static class PooledBufferProviderRecycler implements BufferRecycler {
+
+ private final Object listenerRegistrationLock = new Object();
+
+ private final Queue<Buffer> buffers;
+
+ private final ConcurrentLinkedQueue<EventListener<Buffer>> registeredListeners =
+ Queues.newConcurrentLinkedQueue();
+
+ public PooledBufferProviderRecycler(Queue<Buffer> buffers) {
+ this.buffers = buffers;
+ }
+
+ @Override
+ public void recycle(MemorySegment segment) {
+ synchronized (listenerRegistrationLock) {
+ final Buffer buffer = new Buffer(segment, this);
+
+ EventListener<Buffer> listener = registeredListeners.poll();
+
+ if (listener == null) {
+ buffers.add(buffer);
+ }
+ else {
+ listener.onEvent(buffer);
+ }
+ }
+ }
+
+ boolean registerListener(EventListener<Buffer> listener) {
+ synchronized (listenerRegistrationLock) {
+ if (buffers.isEmpty()) {
+ registeredListeners.add(listener);
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
new file mode 100644
index 0000000..dea9df2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+public interface TestProducerSource {
+
+ /**
+ * Returns the next buffer or event instance.
+ *
+ * <p> The channel index specifies the subpartition add the data to.
+ */
+ BufferOrEvent getNextBufferOrEvent() throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
new file mode 100644
index 0000000..d10e1a0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.mockito.Mockito.spy;
+
+/**
+ * A test input gate to mock reading data.
+ */
+public class TestSingleInputGate {
+
+ protected final SingleInputGate inputGate;
+
+ protected final TestInputChannel[] inputChannels;
+
+ public TestSingleInputGate(int numberOfInputChannels) {
+ this(numberOfInputChannels, true);
+ }
+
+ public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
+ checkArgument(numberOfInputChannels >= 1);
+
+ this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
+
+ this.inputChannels = new TestInputChannel[numberOfInputChannels];
+
+ if (initialize) {
+ for (int i = 0; i < numberOfInputChannels; i++) {
+ inputChannels[i] = new TestInputChannel(inputGate, i);
+ inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
+ }
+ }
+ }
+
+ public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException {
+ checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
+
+ inputChannels[channelIndex].read(buffer);
+
+ return this;
+ }
+
+ public TestSingleInputGate readBuffer() throws IOException, InterruptedException {
+ return readBuffer(0);
+ }
+
+ public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException {
+ inputChannels[channelIndex].readBuffer();
+
+ return this;
+ }
+
+ public TestSingleInputGate readEvent() throws IOException, InterruptedException {
+ return readEvent(0);
+ }
+
+ public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException {
+ inputChannels[channelIndex].readEvent();
+
+ return this;
+ }
+
+ public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException {
+ for (TestInputChannel inputChannel : inputChannels) {
+ inputChannel.readEndOfSuperstepEvent();
+ }
+
+ return this;
+ }
+
+ public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException {
+ inputChannels[channelIndex].readEndOfSuperstepEvent();
+
+ return this;
+ }
+
+ public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException {
+ for (TestInputChannel inputChannel : inputChannels) {
+ inputChannel.readEndOfPartitionEvent();
+ }
+
+ return this;
+ }
+
+ public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException {
+ inputChannels[channelIndex].readEndOfPartitionEvent();
+
+ return this;
+ }
+
+ public SingleInputGate getInputGate() {
+ return inputGate;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public List<Integer> readAllChannels() throws IOException, InterruptedException {
+ final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
+
+ for (int i = 0; i < inputChannels.length; i++) {
+ readOrder.add(i);
+ }
+
+ Collections.shuffle(readOrder);
+
+ for (int channelIndex : readOrder) {
+ inputChannels[channelIndex].readBuffer();
+ }
+
+ return readOrder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
new file mode 100644
index 0000000..2766e53
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A test subpartition view consumer.
+ *
+ * <p> The behaviour of the consumer is customizable by specifying a callback.
+ *
+ * @see TestConsumerCallback
+ */
+public class TestSubpartitionConsumer implements Callable<Boolean> {
+
+ private static final int MAX_SLEEP_TIME_MS = 20;
+
+ /** The subpartition view to consume. */
+ private final ResultSubpartitionView subpartitionView;
+
+ /**
+ * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
+ * number of milliseconds between returned buffers.
+ */
+ private final boolean isSlowConsumer;
+
+ /** The callback to handle a read buffer. */
+ private final TestConsumerCallback callback;
+
+ /** Random source for sleeps. */
+ private final Random random;
+
+ public TestSubpartitionConsumer(
+ ResultSubpartitionView subpartitionView,
+ boolean isSlowConsumer,
+ TestConsumerCallback callback) {
+
+ this.subpartitionView = checkNotNull(subpartitionView);
+ this.isSlowConsumer = isSlowConsumer;
+ this.random = isSlowConsumer ? new Random() : null;
+ this.callback = checkNotNull(callback);
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ final TestNotificationListener listener = new TestNotificationListener();
+
+ try {
+ while (true) {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ final Buffer buffer = subpartitionView.getNextBuffer();
+
+ if (isSlowConsumer) {
+ Thread.sleep(random.nextInt(MAX_SLEEP_TIME_MS + 1));
+ }
+
+ if (buffer != null) {
+ if (buffer.isBuffer()) {
+ callback.onBuffer(buffer);
+ }
+ else {
+ final AbstractEvent event = EventSerializer.fromBuffer(buffer,
+ getClass().getClassLoader());
+
+ callback.onEvent(event);
+
+ buffer.recycle();
+
+ if (event.getClass() == EndOfPartitionEvent.class) {
+ subpartitionView.notifySubpartitionConsumed();
+
+ return true;
+ }
+ }
+ }
+ else {
+ int current = listener.getNumberOfNotifications();
+
+ if (subpartitionView.registerListener(listener)) {
+ listener.waitForNotification(current);
+ }
+ else if (subpartitionView.isReleased()) {
+ return true;
+ }
+ }
+ }
+ }
+ finally {
+ subpartitionView.releaseAllResources();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
new file mode 100644
index 0000000..52c156e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A test subpartition producer.
+ *
+ * <p> The behaviour of the producer is customizable by specifying a source.
+ *
+ * @see TestProducerSource
+ */
+public class TestSubpartitionProducer implements Callable<Boolean> {
+
+ public static final int MAX_SLEEP_TIME_MS = 20;
+
+ /** The subpartition to add data to. */
+ private final ResultSubpartition subpartition;
+
+ /**
+ * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
+ * number of milliseconds between adding data.
+ */
+ private final boolean isSlowProducer;
+
+ /** The source data. */
+ private final TestProducerSource source;
+
+ /** Random source for sleeps. */
+ private final Random random;
+
+ public TestSubpartitionProducer(
+ ResultSubpartition subpartition,
+ boolean isSlowProducer,
+ TestProducerSource source) {
+
+ this.subpartition = checkNotNull(subpartition);
+ this.isSlowProducer = isSlowProducer;
+ this.random = isSlowProducer ? new Random() : null;
+ this.source = checkNotNull(source);
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+
+ try {
+ BufferOrEvent bufferOrEvent;
+
+ while ((bufferOrEvent = source.getNextBufferOrEvent()) != null) {
+ if (bufferOrEvent.isBuffer()) {
+ subpartition.add(bufferOrEvent.getBuffer());
+ }
+ else if (bufferOrEvent.isEvent()) {
+ final Buffer buffer = EventSerializer.toBuffer(bufferOrEvent.getEvent());
+
+ subpartition.add(buffer);
+ }
+ else {
+ throw new IllegalStateException("BufferOrEvent instance w/o buffer nor event.");
+ }
+
+ // Check for interrupted flag after adding data to prevent resource leaks
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ if (isSlowProducer) {
+ Thread.sleep(random.nextInt(MAX_SLEEP_TIME_MS + 1));
+ }
+ }
+
+ subpartition.finish();
+
+ return true;
+ }
+ finally {
+ subpartition.release();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
index 4f547aa..0b29032 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.event.task.TaskEvent;
import java.io.IOException;
+/**
+ * A task event used in various tests.
+ */
public class TestTaskEvent extends TaskEvent {
private double val0;
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 84fc851..63ce5e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingMockSingleInputGate;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -51,7 +51,7 @@ import java.util.HashSet;
import java.util.Set;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, BufferWriter.class})
+@PrepareForTest({Task.class, ResultPartitionWriter.class})
public class DataSinkTaskTest extends TaskTestBase
{
private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
@@ -138,7 +138,7 @@ public class DataSinkTaskTest extends TaskTestBase
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
- IteratorWrappingMockSingleInputGate<?>[] readers = new IteratorWrappingMockSingleInputGate[4];
+ IteratorWrappingTestSingleInputGate<?>[] readers = new IteratorWrappingTestSingleInputGate[4];
readers[0] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0, false);
readers[1] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0, false);
readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false);
@@ -151,7 +151,7 @@ public class DataSinkTaskTest extends TaskTestBase
try {
// For the union reader to work, we need to start notifications *after* the union reader
// has been initialized.
- for (IteratorWrappingMockSingleInputGate<?> reader : readers) {
+ for (IteratorWrappingTestSingleInputGate<?> reader : readers) {
reader.read();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 90bb944..4548410 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -28,7 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.taskmanager.Task;
import org.junit.Assert;
@@ -47,7 +47,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, BufferWriter.class})
+@PrepareForTest({Task.class, ResultPartitionWriter.class})
public class DataSourceTaskTest extends TaskTestBase {
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index e22789c..88a71c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.RegularPactTask;
@@ -48,7 +48,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, BufferWriter.class})
+@PrepareForTest({Task.class, ResultPartitionWriter.class})
@SuppressWarnings("deprecation")
public class ChainTaskTest extends TaskTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 7fb13e3..6625bbc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -27,10 +27,10 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingMockSingleInputGate;
+import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
@@ -71,7 +71,7 @@ public class MockEnvironment implements Environment {
private final List<InputGate> inputs;
- private final List<BufferWriter> outputs;
+ private final List<ResultPartitionWriter> outputs;
private final JobID jobID = new JobID();
@@ -83,7 +83,7 @@ public class MockEnvironment implements Environment {
this.jobConfiguration = new Configuration();
this.taskConfiguration = new Configuration();
this.inputs = new LinkedList<InputGate>();
- this.outputs = new LinkedList<BufferWriter>();
+ this.outputs = new LinkedList<ResultPartitionWriter>();
this.memManager = new DefaultMemoryManager(memorySize, 1);
this.ioManager = new IOManagerAsync();
@@ -91,9 +91,9 @@ public class MockEnvironment implements Environment {
this.bufferSize = bufferSize;
}
- public IteratorWrappingMockSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
+ public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
try {
- final IteratorWrappingMockSingleInputGate<Record> reader = new IteratorWrappingMockSingleInputGate<Record>(bufferSize, Record.class, inputIterator);
+ final IteratorWrappingTestSingleInputGate<Record> reader = new IteratorWrappingTestSingleInputGate<Record>(bufferSize, Record.class, inputIterator);
inputs.add(reader.getInputGate());
@@ -118,7 +118,7 @@ public class MockEnvironment implements Environment {
}
});
- BufferWriter mockWriter = mock(BufferWriter.class);
+ ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
@@ -225,13 +225,13 @@ public class MockEnvironment implements Environment {
}
@Override
- public BufferWriter getWriter(int index) {
+ public ResultPartitionWriter getWriter(int index) {
return outputs.get(index);
}
@Override
- public BufferWriter[] getAllWriters() {
- return outputs.toArray(new BufferWriter[outputs.size()]);
+ public ResultPartitionWriter[] getAllWriters() {
+ return outputs.toArray(new ResultPartitionWriter[outputs.size()]);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index e0776d9..b93d37e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.record.io.FileOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingMockSingleInputGate;
+import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.PactDriver;
@@ -55,14 +55,14 @@ public abstract class TaskTestBase {
this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize);
}
- public IteratorWrappingMockSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
- final IteratorWrappingMockSingleInputGate<Record> reader = addInput(input, groupId, true);
+ public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
+ final IteratorWrappingTestSingleInputGate<Record> reader = addInput(input, groupId, true);
return reader;
}
- public IteratorWrappingMockSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
- final IteratorWrappingMockSingleInputGate<Record> reader = this.mockEnv.addInput(input);
+ public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
+ final IteratorWrappingTestSingleInputGate<Record> reader = this.mockEnv.addInput(input);
TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration());
conf.addInputToGroup(groupId);
conf.setInputSerializer(RecordSerializerFactory.get(), groupId);
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index a7173b4..f4ee52f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -196,7 +196,7 @@ public class TaskManagerProcessReapingTest {
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
TaskManager.runTaskManager("localhost", taskManagerPort, cfg);
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index dbc6e9d..c41e6b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -29,16 +29,18 @@ import akka.util.Timeout;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -101,8 +103,8 @@ public class TaskManagerTest {
final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
- Collections.<PartitionDeploymentDescriptor>emptyList(),
- Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<BlobKey>(), 0);
new Within(duration("1 seconds")){
@@ -140,14 +142,14 @@ public class TaskManagerTest {
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5,
new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
- Collections.<PartitionDeploymentDescriptor>emptyList(),
- Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<BlobKey>(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7,
new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
- Collections.<PartitionDeploymentDescriptor>emptyList(),
- Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<BlobKey>(), 0);
final FiniteDuration d = duration("1 second");
@@ -243,14 +245,14 @@ public class TaskManagerTest {
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
- Collections.<PartitionDeploymentDescriptor>emptyList(),
- Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<BlobKey>(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
- Collections.<PartitionDeploymentDescriptor>emptyList(),
- Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<BlobKey>(), 0);
new Within(duration("1 second")){
@@ -310,24 +312,24 @@ public class TaskManagerTest {
IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
- List<PartitionDeploymentDescriptor> irpdd = new ArrayList<PartitionDeploymentDescriptor>();
- irpdd.add(new PartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, IntermediateResultPartitionType.PIPELINED, 1));
+ List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
- PartitionConsumerDeploymentDescriptor ircdd =
- new PartitionConsumerDeploymentDescriptor(
+ InputGateDeploymentDescriptor ircdd =
+ new InputGateDeploymentDescriptor(
new IntermediateDataSetID(),
- new PartitionInfo[]{
- new PartitionInfo(partitionId, eid1, PartitionInfo.PartitionLocation.LOCAL, null)
- },
- 0);
+ 0, new InputChannelDeploymentDescriptor[]{
+ new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
+ }
+ );
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
- irpdd, Collections.<PartitionConsumerDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
+ irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
- Collections.<PartitionDeploymentDescriptor>emptyList(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.singletonList(ircdd),
new ArrayList<BlobKey>(), 0);
@@ -402,25 +404,25 @@ public class TaskManagerTest {
IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
- List<PartitionDeploymentDescriptor> irpdd = new ArrayList<PartitionDeploymentDescriptor>();
- irpdd.add(new PartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, IntermediateResultPartitionType.PIPELINED, 1));
+ List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
- PartitionConsumerDeploymentDescriptor ircdd =
- new PartitionConsumerDeploymentDescriptor(
+ InputGateDeploymentDescriptor ircdd =
+ new InputGateDeploymentDescriptor(
new IntermediateDataSetID(),
- new PartitionInfo[]{
- new PartitionInfo(partitionId, eid1, PartitionInfo.PartitionLocation.LOCAL, null)
- },
- 0);
+ 0, new InputChannelDeploymentDescriptor[]{
+ new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
+ }
+ );
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
- irpdd, Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+ irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<BlobKey>(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
- Collections.<PartitionDeploymentDescriptor>emptyList(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.singletonList(ircdd),
new ArrayList<BlobKey>(), 0);