You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:42:37 UTC
[3/6] flink git commit: [FLINK-5169] [network] Adjust tests to new
consumer logic
[FLINK-5169] [network] Adjust tests to new consumer logic
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3ac0adf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3ac0adf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3ac0adf
Branch: refs/heads/master
Commit: d3ac0adfd7ed8878f0c80e0c454c580969e40cfc
Parents: f728129
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Nov 28 09:59:58 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 21:42:49 2016 +0100
----------------------------------------------------------------------
.../runtime/io/disk/SpillingBufferTest.java | 40 ++--
.../iomanager/BufferFileWriterReaderTest.java | 1 -
.../io/network/api/reader/BufferReaderTest.java | 115 -----------
.../netty/CancelPartitionRequestTest.java | 37 ++--
.../netty/PartitionRequestQueueTest.java | 23 ++-
.../netty/ServerTransportErrorHandlingTest.java | 54 +++---
.../PartialConsumePipelinedResultTest.java | 18 +-
.../partition/PipelinedSubpartitionTest.java | 118 +++---------
.../network/partition/ResultPartitionTest.java | 1 -
.../partition/SpillableSubpartitionTest.java | 20 +-
.../SpilledSubpartitionViewAsyncIOTest.java | 65 -------
.../SpilledSubpartitionViewSyncIOTest.java | 103 ----------
.../partition/SpilledSubpartitionViewTest.java | 192 ++++++++++++-------
.../network/partition/SubpartitionTestBase.java | 10 +-
.../partition/consumer/InputChannelTest.java | 13 +-
.../IteratorWrappingTestSingleInputGate.java | 23 +--
.../consumer/LocalInputChannelTest.java | 84 ++++----
.../consumer/RemoteInputChannelTest.java | 2 +-
.../partition/consumer/SingleInputGateTest.java | 78 +++-----
.../partition/consumer/TestInputChannel.java | 32 +---
.../partition/consumer/TestSingleInputGate.java | 101 ++--------
.../partition/consumer/UnionInputGateTest.java | 43 +++--
.../network/util/TestSubpartitionConsumer.java | 69 ++++---
.../runtime/operators/DataSinkTaskTest.java | 6 +-
.../operators/chaining/ChainTaskTest.java | 12 +-
.../operators/testutils/MockEnvironment.java | 12 +-
.../operators/testutils/TaskTestBase.java | 2 +-
.../TaskCancelAsyncProducerConsumerITCase.java | 34 +---
.../consumer/StreamTestSingleInputGate.java | 46 +++--
.../io/BarrierBufferMassiveRandomTest.java | 17 +-
.../streaming/runtime/io/MockInputGate.java | 28 +--
.../tasks/OneInputStreamTaskTestHarness.java | 27 +--
.../runtime/tasks/StreamMockEnvironment.java | 21 +-
.../StreamTaskCancellationBarrierTest.java | 2 -
.../runtime/tasks/StreamTaskTestHarness.java | 14 +-
.../runtime/tasks/TwoInputStreamTaskTest.java | 6 +-
.../tasks/TwoInputStreamTaskTestHarness.java | 1 -
37 files changed, 554 insertions(+), 916 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 538c416..01a9723 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -109,7 +109,7 @@ public class SpillingBufferTest {
DataInputView inView = outView.flip();
generator.reset();
- // read and re-generate all records and compare them
+ // notifyNonEmpty and re-generate all records and compare them
final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
generator.next(rec);
@@ -121,14 +121,14 @@ public class SpillingBufferTest {
int k2 = readRec.f0;
String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
}
- // re-read the data
+ // re-notifyNonEmpty the data
inView = outView.flip();
generator.reset();
- // read and re-generate all records and compare them
+ // notifyNonEmpty and re-generate all records and compare them
for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
generator.next(rec);
serializer.deserialize(readRec, inView);
@@ -139,7 +139,7 @@ public class SpillingBufferTest {
int k2 = readRec.f0;
String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(outView.close());
@@ -169,7 +169,7 @@ public class SpillingBufferTest {
DataInputView inView = outView.flip();
generator.reset();
- // read and re-generate all records and compare them
+ // notifyNonEmpty and re-generate all records and compare them
final Tuple2<Integer, String> readRec = new Tuple2<>();
try {
for (int i = 0; i < NUM_PAIRS_INMEM + 1; i++) {
@@ -182,7 +182,7 @@ public class SpillingBufferTest {
int k2 = readRec.f0;
String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
}
Assert.fail("Read too much, expected EOFException.");
}
@@ -190,11 +190,11 @@ public class SpillingBufferTest {
// expected
}
- // re-read the data
+ // re-notifyNonEmpty the data
inView = outView.flip();
generator.reset();
- // read and re-generate all records and compare them
+ // notifyNonEmpty and re-generate all records and compare them
for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
generator.next(rec);
serializer.deserialize(readRec, inView);
@@ -205,7 +205,7 @@ public class SpillingBufferTest {
int k2 = readRec.f0;
String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(outView.close());
@@ -237,7 +237,7 @@ public class SpillingBufferTest {
DataInputView inView = outView.flip();
generator.reset();
- // read and re-generate all records and compare them
+ // notifyNonEmpty and re-generate all records and compare them
final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
generator.next(rec);
@@ -249,14 +249,14 @@ public class SpillingBufferTest {
int k2 = readRec.f0;
String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
}
- // re-read the data
+ // re-notifyNonEmpty the data
inView = outView.flip();
generator.reset();
- // read and re-generate all records and compare them
+ // notifyNonEmpty and re-generate all records and compare them
for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
generator.next(rec);
serializer.deserialize(readRec, inView);
@@ -267,7 +267,7 @@ public class SpillingBufferTest {
int k2 = readRec.f0;
String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(outView.close());
@@ -297,7 +297,7 @@ public class SpillingBufferTest {
DataInputView inView = outView.flip();
generator.reset();
- // read and re-generate all records and compare them
+ // notifyNonEmpty and re-generate all records and compare them
final Tuple2<Integer, String> readRec = new Tuple2<>();
try {
for (int i = 0; i < NUM_PAIRS_EXTERNAL + 1; i++) {
@@ -310,7 +310,7 @@ public class SpillingBufferTest {
int k2 = readRec.f0;
String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
}
Assert.fail("Read too much, expected EOFException.");
}
@@ -318,11 +318,11 @@ public class SpillingBufferTest {
// expected
}
- // re-read the data
+ // re-notifyNonEmpty the data
inView = outView.flip();
generator.reset();
- // read and re-generate all records and compare them
+ // notifyNonEmpty and re-generate all records and compare them
for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
generator.next(rec);
serializer.deserialize(readRec, inView);
@@ -333,7 +333,7 @@ public class SpillingBufferTest {
int k2 = readRec.f0;
String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(outView.close());
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 375be45..2da0f7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.testutils.DiscardingRecycler;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
deleted file mode 100644
index 099b6fb..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
-import org.apache.flink.runtime.io.network.util.TestTaskEvent;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Task.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-@SuppressWarnings("unchecked")
-public class BufferReaderTest {
-
- @Test
- public void testGetNextBufferOrEvent() throws IOException, InterruptedException {
-
- final TestSingleInputGate inputGate = new TestSingleInputGate(1)
- .readBuffer().readBuffer().readEvent()
- .readBuffer().readBuffer().readEvent()
- .readBuffer().readEndOfPartitionEvent();
-
- final BufferReader reader = new BufferReader(inputGate.getInputGate());
-
- // Task event listener to be notified...
- final EventListener<TaskEvent> listener = mock(EventListener.class);
- reader.registerTaskEventListener(listener, TestTaskEvent.class);
-
- int numReadBuffers = 0;
- while ((reader.getNextBuffer()) != null) {
- numReadBuffers++;
- }
-
- assertEquals(5, numReadBuffers);
- verify(listener, times(2)).onEvent(any(TaskEvent.class));
- }
-
- @Test
- public void testIterativeGetNextBufferOrEvent() throws IOException, InterruptedException {
-
- final TestSingleInputGate inputGate = new TestSingleInputGate(1)
- .readBuffer().readBuffer().readEvent()
- .readBuffer().readBuffer().readEvent()
- .readBuffer().readEndOfSuperstepEvent()
- .readBuffer().readBuffer().readEvent()
- .readBuffer().readBuffer().readEvent()
- .readBuffer().readEndOfPartitionEvent();
-
- final BufferReader reader = new BufferReader(inputGate.getInputGate());
-
- // Set reader iterative
- reader.setIterativeReader();
-
- // Task event listener to be notified...
- final EventListener<TaskEvent> listener = mock(EventListener.class);
- // Task event listener to be notified...
- reader.registerTaskEventListener(listener, TestTaskEvent.class);
-
- int numReadBuffers = 0;
- int numEndOfSuperstepEvents = 0;
-
- while (true) {
- Buffer buffer = reader.getNextBuffer();
-
- if (buffer != null) {
- numReadBuffers++;
- }
- else if (reader.hasReachedEndOfSuperstep()) {
- reader.startNextSuperstep();
-
- numEndOfSuperstepEvents++;
- }
- else if (reader.isFinished()) {
- break;
- }
- }
-
- assertEquals(10, numReadBuffers);
- assertEquals(1, numEndOfSuperstepEvents);
-
- verify(listener, times(4)).onEvent(any(TaskEvent.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 1ff1e99..a2f866a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -24,14 +24,16 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.event.NotificationListener;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
@@ -73,11 +75,18 @@ public class CancelPartitionRequestTest {
CountDownLatch sync = new CountDownLatch(1);
- ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
+ final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
// Return infinite subpartition
- when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class)))
- .thenReturn(view);
+ when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+ .thenAnswer(new Answer<ResultSubpartitionView>() {
+ @Override
+ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
+ BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+ listener.notifyBuffersAvailable(Long.MAX_VALUE);
+ return view;
+ }
+ });
PartitionRequestProtocol protocol = new PartitionRequestProtocol(
partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
@@ -109,19 +118,26 @@ public class CancelPartitionRequestTest {
NettyServerAndClient serverAndClient = null;
try {
- TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
+ final TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
ResultPartitionManager partitions = mock(ResultPartitionManager.class);
ResultPartitionID pid = new ResultPartitionID();
- CountDownLatch sync = new CountDownLatch(1);
+ final CountDownLatch sync = new CountDownLatch(1);
- ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
+ final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
// Return infinite subpartition
- when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class)))
- .thenReturn(view);
+ when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+ .thenAnswer(new Answer<ResultSubpartitionView>() {
+ @Override
+ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
+ BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+ listener.notifyBuffersAvailable(Long.MAX_VALUE);
+ return view;
+ }
+ });
PartitionRequestProtocol protocol = new PartitionRequestProtocol(
partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
@@ -174,8 +190,7 @@ public class CancelPartitionRequestTest {
}
@Override
- public boolean registerListener(final NotificationListener listener) throws IOException {
- return false;
+ public void notifyBuffersAvailable(long buffers) throws IOException {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 3f281bd..7224e96 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -20,12 +20,18 @@ package org.apache.flink.runtime.io.network.netty;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -35,14 +41,27 @@ public class PartitionRequestQueueTest {
public void testProducerFailedException() throws Exception {
PartitionRequestQueue queue = new PartitionRequestQueue();
- EmbeddedChannel ch = new EmbeddedChannel(queue);
+ ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
+ ResultPartitionID rpid = new ResultPartitionID();
+ BufferProvider bufferProvider = mock(BufferProvider.class);
ResultSubpartitionView view = mock(ResultSubpartitionView.class);
when(view.isReleased()).thenReturn(true);
when(view.getFailureCause()).thenReturn(new RuntimeException("Expected test exception"));
+ when(partitionProvider.createSubpartitionView(
+ eq(rpid),
+ eq(0),
+ eq(bufferProvider),
+ any(BufferAvailabilityListener.class))).thenReturn(view);
+
+ EmbeddedChannel ch = new EmbeddedChannel(queue);
+
+ SequenceNumberingViewReader seqView = new SequenceNumberingViewReader(new InputChannelID(), queue);
+ seqView.requestSubpartitionView(partitionProvider, rpid, 0, bufferProvider);
+
// Enqueue the erroneous view
- queue.enqueue(view, new InputChannelID());
+ queue.notifyReaderNonEmpty(seqView);
ch.runPendingTasks();
// Read the enqueued msg
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 1515f83..1c3557e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -25,20 +25,20 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.InfiniteSubpartitionView;
-import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
@@ -63,36 +63,43 @@ public class ServerTransportErrorHandlingTest {
final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
when(partitionManager
- .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class)))
- .thenReturn(new InfiniteSubpartitionView(outboundBuffers, sync));
+ .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+ .thenAnswer(new Answer<ResultSubpartitionView>() {
+ @Override
+ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
+ BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+ listener.notifyBuffersAvailable(Long.MAX_VALUE);
+ return new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
+ }
+ });
NettyProtocol protocol = new NettyProtocol() {
@Override
public ChannelHandler[] getServerChannelHandlers() {
return new PartitionRequestProtocol(
- partitionManager,
- mock(TaskEventDispatcher.class),
- mock(NetworkBufferPool.class)).getServerChannelHandlers();
+ partitionManager,
+ mock(TaskEventDispatcher.class),
+ mock(NetworkBufferPool.class)).getServerChannelHandlers();
}
@Override
public ChannelHandler[] getClientChannelHandlers() {
- return new ChannelHandler[] {
- new NettyMessageEncoder(),
- // Close on read
- new ChannelInboundHandlerAdapter() {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
-
- ctx.channel().close();
- }
+ return new ChannelHandler[]{
+ new NettyMessage.NettyMessageEncoder(),
+ // Close on read
+ new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+
+ ctx.channel().close();
}
+ }
};
}
};
- NettyServerAndClient serverAndClient = null;
+ NettyTestUtil.NettyServerAndClient serverAndClient = null;
try {
serverAndClient = initServerAndClient(protocol, createConfig());
@@ -100,15 +107,14 @@ public class ServerTransportErrorHandlingTest {
Channel ch = connect(serverAndClient);
// Write something to trigger close by server
- ch.writeAndFlush(new PartitionRequest(new ResultPartitionID(), 0, new InputChannelID()));
+ ch.writeAndFlush(new NettyMessage.PartitionRequest(new ResultPartitionID(), 0, new InputChannelID()));
// Wait for the notification
if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() +
- " ms to be notified about released partition.");
+ " ms to be notified about released partition.");
}
- }
- finally {
+ } finally {
shutdown(serverAndClient);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index af8aa69..1ec4ad3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -21,12 +21,12 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.reader.BufferReader;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingCluster;
@@ -87,12 +87,12 @@ public class PartialConsumePipelinedResultTest {
// The partition needs to be pipelined, otherwise the original issue does not occur, because
// the sender and receiver are not online at the same time.
receiver.connectNewDataSetAsInput(
- sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
final JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", sender, receiver);
final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
- sender.getID(), receiver.getID());
+ sender.getID(), receiver.getID());
sender.setSlotSharingGroup(slotSharingGroup);
receiver.setSlotSharingGroup(slotSharingGroup);
@@ -127,11 +127,11 @@ public class PartialConsumePipelinedResultTest {
@Override
public void invoke() throws Exception {
- final BufferReader reader = new BufferReader(getEnvironment().getInputGate(0));
-
- final Buffer buffer = reader.getNextBuffer();
-
- buffer.recycle();
+ InputGate gate = getEnvironment().getInputGate(0);
+ Buffer buffer = gate.getNextBufferOrEvent().getBuffer();
+ if (buffer != null) {
+ buffer.recycle();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 8750a1a..a56177e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
@@ -38,12 +37,13 @@ import java.util.concurrent.Future;
import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class PipelinedSubpartitionTest extends SubpartitionTestBase {
@@ -63,80 +63,25 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
}
@Test
- public void testRegisterListener() throws Exception {
- final PipelinedSubpartition subpartition = createSubpartition();
-
- final TestNotificationListener listener = new TestNotificationListener();
-
- // Register a listener
- assertTrue(subpartition.registerListener(listener));
-
- // Try to register another listener
- try {
- subpartition.registerListener(listener);
-
- fail("Did not throw expected exception after duplicate listener registration.");
- }
- catch (IllegalStateException expected) {
- }
- }
-
- @Test
- public void testListenerNotification() throws Exception {
- final TestNotificationListener listener = new TestNotificationListener();
- assertEquals(0, listener.getNumberOfNotifications());
-
- {
- final PipelinedSubpartition subpartition = createSubpartition();
-
- // Register a listener
- assertTrue(subpartition.registerListener(listener));
-
- // Notify on add and remove listener
- subpartition.add(mock(Buffer.class));
- assertEquals(1, listener.getNumberOfNotifications());
-
- // No notification, should have removed listener after first notification
- subpartition.add(mock(Buffer.class));
- assertEquals(1, listener.getNumberOfNotifications());
- }
-
- {
- final PipelinedSubpartition subpartition = createSubpartition();
-
- // Register a listener
- assertTrue(subpartition.registerListener(listener));
-
- // Notify on finish
- subpartition.finish();
- assertEquals(2, listener.getNumberOfNotifications());
- }
-
- {
- final PipelinedSubpartition subpartition = createSubpartition();
-
- // Register a listener
- assertTrue(subpartition.registerListener(listener));
-
- // Notify on release
- subpartition.release();
- assertEquals(3, listener.getNumberOfNotifications());
- }
- }
-
- @Test
public void testIllegalReadViewRequest() throws Exception {
final PipelinedSubpartition subpartition = createSubpartition();
// Successful request
- assertNotNull(subpartition.createReadView(null));
+ assertNotNull(subpartition.createReadView(null, new BufferAvailabilityListener() {
+ @Override
+ public void notifyBuffersAvailable(long numBuffers) {
+ }
+ }));
try {
- subpartition.createReadView(null);
+ subpartition.createReadView(null, new BufferAvailabilityListener() {
+ @Override
+ public void notifyBuffersAvailable(long numBuffers) {
+ }
+ });
- fail("Did not throw expected exception after duplicate read view request.");
- }
- catch (IllegalStateException expected) {
+ fail("Did not throw expected exception after duplicate notifyNonEmpty view request.");
+ } catch (IllegalStateException expected) {
}
}
@@ -144,23 +89,19 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
public void testBasicPipelinedProduceConsumeLogic() throws Exception {
final PipelinedSubpartition subpartition = createSubpartition();
- TestNotificationListener listener = new TestNotificationListener();
+ BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
- ResultSubpartitionView view = subpartition.createReadView(null);
+ ResultSubpartitionView view = subpartition.createReadView(null, listener);
// Empty => should return null
assertNull(view.getNextBuffer());
-
- // Register listener for notifications
- assertTrue(view.registerListener(listener));
-
- assertEquals(0, listener.getNumberOfNotifications());
+ verify(listener, times(1)).notifyBuffersAvailable(eq(0L));
// Add data to the queue...
subpartition.add(createBuffer());
// ...should have resulted in a notification
- assertEquals(1, listener.getNumberOfNotifications());
+ verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
// ...and one available result
assertNotNull(view.getNextBuffer());
@@ -168,10 +109,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
// Add data to the queue...
subpartition.add(createBuffer());
- // ...don't allow to subscribe, if data is available
- assertFalse(view.registerListener(listener));
-
- assertEquals(1, listener.getNumberOfNotifications());
+ verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
}
@Test
@@ -208,7 +146,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
@Override
public BufferOrEvent getNextBufferOrEvent() throws Exception {
-
if (numberOfBuffers == producerNumberOfBuffersToProduce) {
return null;
}
@@ -261,16 +198,17 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
final PipelinedSubpartition subpartition = createSubpartition();
- final PipelinedSubpartitionView view = subpartition.createReadView(null);
+ TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
+ final PipelinedSubpartitionView view = subpartition.createReadView(null, consumer);
+ consumer.setSubpartitionView(view);
- Future<Boolean> producer = executorService.submit(
- new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource));
+ Future<Boolean> producerResult = executorService.submit(
+ new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource));
- Future<Boolean> consumer = executorService.submit(
- new TestSubpartitionConsumer(view, isSlowConsumer, consumerCallback));
+ Future<Boolean> consumerResult = executorService.submit(consumer);
// Wait for producer and consumer to finish
- producer.get();
- consumer.get();
+ producerResult.get();
+ consumerResult.get();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index f6fddfa..4eb4fd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -86,7 +86,6 @@ public class ResultPartitionTest {
mock(ResultPartitionManager.class),
notifier,
mock(IOManager.class),
- IOManager.IOMode.SYNC,
sendScheduleOrUpdateConsumersMessage);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index d7e56c8..b7a54d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.junit.AfterClass;
import org.junit.Test;
@@ -34,7 +35,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.SYNC;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -59,7 +60,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
@Override
ResultSubpartition createSubpartition() {
- return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager, SYNC);
+ return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager);
}
/**
@@ -87,14 +88,14 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
// Mock I/O manager returning the blocking spill writer
IOManager ioManager = mock(IOManager.class);
when(ioManager.createBufferFileWriter(any(FileIOChannel.ID.class)))
- .thenReturn(spillWriter);
+ .thenReturn(spillWriter);
// The partition
final SpillableSubpartition partition = new SpillableSubpartition(
- 0, mock(ResultPartition.class), ioManager, SYNC);
+ 0, mock(ResultPartition.class), ioManager);
// Spill the partition initially (creates the spill writer)
- partition.releaseMemory();
+ assertEquals(0, partition.releaseMemory());
ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -130,13 +131,18 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
public void testReleasePartitionAndGetNext() throws Exception {
// Create partition and add some buffers
SpillableSubpartition partition = new SpillableSubpartition(
- 0, mock(ResultPartition.class), ioManager, SYNC);
+ 0, mock(ResultPartition.class), ioManager);
partition.finish();
// Create the read view
ResultSubpartitionView readView = spy(partition
- .createReadView(new TestInfiniteBufferProvider()));
+ .createReadView(new TestInfiniteBufferProvider(), new BufferAvailabilityListener() {
+ @Override
+ public void notifyBuffersAvailable(long numBuffers) {
+
+ }
+ }));
// The released state check (of the parent) needs to be independent
// of the released state of the view.
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
deleted file mode 100644
index 981c8ee..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-public class SpilledSubpartitionViewAsyncIOTest {
-
- private static final IOManager ioManager = new IOManagerAsync();
-
- @AfterClass
- public static void shutdown() {
- ioManager.shutdown();
- }
-
- @Test
- public void testWriteConsume() throws Exception {
- // Config
- final int numberOfBuffersToWrite = 1024;
-
- // Setup
- final BufferFileWriter writer = SpilledSubpartitionViewTest
- .createWriterAndWriteBuffers(ioManager, new TestInfiniteBufferProvider(), numberOfBuffersToWrite);
-
- writer.close();
-
- final TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1);
-
- final SpilledSubpartitionViewAsyncIO view = new SpilledSubpartitionViewAsyncIO(
- mock(ResultSubpartition.class), viewBufferPool, ioManager,
- writer.getChannelID(), 0);
-
- final TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(view, false,
- new TestConsumerCallback.RecyclingCallback());
-
- // Consume subpartition
- consumer.call();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
deleted file mode 100644
index f8baae4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-public class SpilledSubpartitionViewSyncIOTest {
-
- private static final IOManager ioManager = new IOManagerAsync();
-
- private static final TestInfiniteBufferProvider writerBufferPool =
- new TestInfiniteBufferProvider();
-
- @AfterClass
- public static void shutdown() {
- ioManager.shutdown();
- }
-
- @Test
- public void testWriteConsume() throws Exception {
- // Config
- final int numberOfBuffersToWrite = 512;
-
- // Setup
- final BufferFileWriter writer = SpilledSubpartitionViewTest
- .createWriterAndWriteBuffers(ioManager, writerBufferPool, numberOfBuffersToWrite);
-
- writer.close();
-
- final TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1);
-
- final SpilledSubpartitionViewSyncIO view = new SpilledSubpartitionViewSyncIO(
- mock(ResultSubpartition.class),
- viewBufferPool.getMemorySegmentSize(),
- writer.getChannelID(),
- 0);
-
- final TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(view, false,
- new TestConsumerCallback.RecyclingCallback());
-
- // Consume subpartition
- consumer.call();
- }
-
- @Test
- public void testConsumeWithFewBuffers() throws Exception {
- // Config
- final int numberOfBuffersToWrite = 512;
-
- // Setup
- final BufferFileWriter writer = SpilledSubpartitionViewTest
- .createWriterAndWriteBuffers(ioManager, writerBufferPool, numberOfBuffersToWrite);
-
- writer.close();
-
- final SpilledSubpartitionViewSyncIO view = new SpilledSubpartitionViewSyncIO(
- mock(ResultSubpartition.class),
- 32 * 1024,
- writer.getChannelID(),
- 0);
-
- // No buffer available, don't deadlock. We need to make progress in situations when the view
- // is consumed at an input gate with local and remote channels. The remote channels might
- // eat up all the buffers, at which point the spilled view will not have any buffers
- // available and the input gate can't make any progress if we don't return immediately.
- //
- // The current solution is straight-forward with a separate buffer per spilled subpartition,
- // but introduces memory-overhead.
- //
- // TODO Replace with asynchronous buffer pool request as this introduces extra buffers per
- // consumed subpartition.
- final TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(view, false,
- new TestConsumerCallback.RecyclingCallback());
-
- consumer.call();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index 5722cac..8f8da93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -18,26 +18,21 @@
package org.apache.flink.runtime.io.network.partition;
+import com.google.common.collect.Lists;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback.RecyclingCallback;
+import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.junit.AfterClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -47,55 +42,103 @@ import java.util.concurrent.TimeoutException;
import static org.mockito.Mockito.mock;
-/**
- * Test for both the asynchronous and synchronous spilled subpartition view implementation.
- */
-@RunWith(Parameterized.class)
public class SpilledSubpartitionViewTest {
- private static final IOManager ioManager = new IOManagerAsync();
-
- private static final ExecutorService executor = Executors.newCachedThreadPool();
+ private static final IOManager IO_MANAGER = new IOManagerAsync();
private static final TestInfiniteBufferProvider writerBufferPool =
- new TestInfiniteBufferProvider();
-
- private IOMode ioMode;
-
- public SpilledSubpartitionViewTest(IOMode ioMode) {
- this.ioMode = ioMode;
- }
+ new TestInfiniteBufferProvider();
@AfterClass
public static void shutdown() {
- ioManager.shutdown();
- executor.shutdown();
+ IO_MANAGER.shutdown();
}
- @Parameterized.Parameters
- public static Collection<Object[]> ioMode() {
- return Arrays.asList(new Object[][]{
- {IOMode.SYNC},
- {IOMode.ASYNC}});
+ @Test
+ public void testWriteConsume() throws Exception {
+ // Config
+ final int numberOfBuffersToWrite = 512;
+
+ // Setup
+ final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite);
+
+ writer.close();
+
+ TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1);
+
+ TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(
+ false, new TestConsumerCallback.RecyclingCallback());
+
+ SpilledSubpartitionView view = new SpilledSubpartitionView(
+ mock(ResultSubpartition.class),
+ viewBufferPool.getMemorySegmentSize(),
+ writer,
+ numberOfBuffersToWrite + 1, // +1 for end-of-partition
+ consumer);
+
+ consumer.setSubpartitionView(view);
+
+ // Consume subpartition
+ consumer.call();
}
@Test
- public void testReadMultipleFilesWithSingleBufferPool() throws Exception {
+ public void testConsumeWithFewBuffers() throws Exception {
+ // Config
+ final int numberOfBuffersToWrite = 512;
+
// Setup
- BufferFileWriter[] writers = new BufferFileWriter[]{
- createWriterAndWriteBuffers(ioManager, writerBufferPool, 512),
- createWriterAndWriteBuffers(ioManager, writerBufferPool, 512)
- };
+ final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite);
+
+ writer.close();
+
+ TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(
+ false, new TestConsumerCallback.RecyclingCallback());
+
+ SpilledSubpartitionView view = new SpilledSubpartitionView(
+ mock(ResultSubpartition.class),
+ 32 * 1024,
+ writer,
+ numberOfBuffersToWrite + 1,
+ consumer);
+
+ consumer.setSubpartitionView(view);
+
+ // No buffer available, don't deadlock. We need to make progress in situations when the view
+ // is consumed at an input gate with local and remote channels. The remote channels might
+ // eat up all the buffers, at which point the spilled view will not have any buffers
+ // available and the input gate can't make any progress if we don't return immediately.
+ //
+ // The current solution is straight-forward with a separate buffer per spilled subpartition,
+ // but introduces memory-overhead.
+ //
+ // TODO Replace with asynchronous buffer pool request as this introduces extra buffers per
+ // consumed subpartition.
+ consumer.call();
+ }
- final ResultSubpartitionView[] readers = new ResultSubpartitionView[writers.length];
+ @Test
+ public void testReadMultipleFilesWithSingleBufferPool() throws Exception {
+ ExecutorService executor = null;
+ BufferFileWriter[] writers = null;
+ ResultSubpartitionView[] readers = null;
- // Make this buffer pool small so that we can test the behaviour of the asynchronous view
- // with few buffers.
- final BufferProvider inputBuffers = new TestPooledBufferProvider(2);
+ try {
+ executor = Executors.newCachedThreadPool();
- final ResultSubpartition parent = mock(ResultSubpartition.class);
+ // Setup
+ writers = new BufferFileWriter[]{
+ createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512),
+ createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512)
+ };
+
+ readers = new ResultSubpartitionView[writers.length];
+ TestSubpartitionConsumer[] consumers = new TestSubpartitionConsumer[writers.length];
+
+ BufferProvider inputBuffers = new TestPooledBufferProvider(2);
+
+ ResultSubpartition parent = mock(ResultSubpartition.class);
- try {
// Wait for writers to finish
for (BufferFileWriter writer : writers) {
writer.close();
@@ -103,56 +146,56 @@ public class SpilledSubpartitionViewTest {
// Create the views depending on the test configuration
for (int i = 0; i < readers.length; i++) {
- if (ioMode.isSynchronous()) {
- readers[i] = new SpilledSubpartitionViewSyncIO(
- parent,
- inputBuffers.getMemorySegmentSize(),
- writers[i].getChannelID(),
- 0);
- }
- else {
- // For the asynchronous view, it is important that a registered listener will
- // eventually be notified even if the view never got a buffer to read data into.
- //
- // At runtime, multiple threads never share the same buffer pool as in test. We
- // do it here to provoke the erroneous behaviour.
- readers[i] = new SpilledSubpartitionViewAsyncIO(
- parent, inputBuffers, ioManager, writers[i].getChannelID(), 0);
- }
+ consumers[i] = new TestSubpartitionConsumer(
+ false, new TestConsumerCallback.RecyclingCallback());
+
+ readers[i] = new SpilledSubpartitionView(
+ parent,
+ inputBuffers.getMemorySegmentSize(),
+ writers[i],
+ 512 + 1, // +1 for end of partition event
+ consumers[i]);
+
+ consumers[i].setSubpartitionView(readers[i]);
}
- final List<Future<Boolean>> results = new ArrayList<>();
+ final List<Future<Boolean>> results = Lists.newArrayList();
// Submit the consuming tasks
- for (ResultSubpartitionView view : readers) {
- results.add(executor.submit(new TestSubpartitionConsumer(
- view, false, new RecyclingCallback())));
+ for (TestSubpartitionConsumer consumer : consumers) {
+ results.add(executor.submit(consumer));
}
// Wait for the results
for (Future<Boolean> res : results) {
try {
res.get(2, TimeUnit.MINUTES);
- }
- catch (TimeoutException e) {
+ } catch (TimeoutException e) {
throw new TimeoutException("There has been a timeout in the test. This " +
- "indicates that there is a bug/deadlock in the tested subpartition " +
- "view. The timed out test was in " + ioMode + " mode.");
+ "indicates that there is a bug/deadlock in the tested subpartition " +
+ "view.");
}
}
- }
- finally {
- for (BufferFileWriter writer : writers) {
- if (writer != null) {
- writer.deleteChannel();
+ } finally {
+ if (writers != null) {
+ for (BufferFileWriter writer : writers) {
+ if (writer != null) {
+ writer.deleteChannel();
+ }
}
}
- for (ResultSubpartitionView reader : readers) {
- if (reader != null) {
- reader.releaseAllResources();
+ if (readers != null) {
+ for (ResultSubpartitionView reader : readers) {
+ if (reader != null) {
+ reader.releaseAllResources();
+ }
}
}
+
+ if (executor != null) {
+ executor.shutdown();
+ }
}
}
@@ -163,9 +206,9 @@ public class SpilledSubpartitionViewTest {
* <p> Call {@link BufferFileWriter#close()} to ensure that all buffers have been written.
*/
static BufferFileWriter createWriterAndWriteBuffers(
- IOManager ioManager,
- BufferProvider bufferProvider,
- int numberOfBuffers) throws IOException {
+ IOManager ioManager,
+ BufferProvider bufferProvider,
+ int numberOfBuffers) throws IOException {
final BufferFileWriter writer = ioManager.createBufferFileWriter(ioManager.createChannel());
@@ -177,4 +220,5 @@ public class SpilledSubpartitionViewTest {
return writer;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 26a8f29..14942bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -50,8 +49,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
subpartition.finish();
assertFalse(subpartition.add(mock(Buffer.class)));
- }
- finally {
+ } finally {
if (subpartition != null) {
subpartition.release();
}
@@ -66,8 +64,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
subpartition.release();
assertFalse(subpartition.add(mock(Buffer.class)));
- }
- finally {
+ } finally {
if (subpartition != null) {
subpartition.release();
}
@@ -97,7 +94,8 @@ public abstract class SubpartitionTestBase extends TestLogger {
TestInfiniteBufferProvider buffers = new TestInfiniteBufferProvider();
// Create the view
- ResultSubpartitionView view = partition.createReadView(buffers);
+ BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+ ResultSubpartitionView view = partition.createReadView(buffers, listener);
// The added buffer and end-of-partition event
assertNotNull(view.getNextBuffer());
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index 8884b29..cd75a7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.junit.Test;
@@ -114,11 +113,11 @@ public class InputChannelTest {
private static class MockInputChannel extends InputChannel {
private MockInputChannel(
- SingleInputGate inputGate,
- int channelIndex,
- ResultPartitionID partitionId,
- int initialBackoff,
- int maxBackoff) {
+ SingleInputGate inputGate,
+ int channelIndex,
+ ResultPartitionID partitionId,
+ int initialBackoff,
+ int maxBackoff) {
super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter());
}
@@ -128,7 +127,7 @@ public class InputChannelTest {
}
@Override
- Buffer getNextBuffer() throws IOException, InterruptedException {
+ BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index cfbe99e..fa44393 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -64,24 +64,25 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
// The input iterator can produce an infinite stream. That's why we have to serialize each
// record on demand and cannot do it upfront.
- final Answer<Buffer> answer = new Answer<Buffer>() {
+ final Answer<InputChannel.BufferAndAvailability> answer = new Answer<InputChannel.BufferAndAvailability>() {
+
+ private boolean hasData = inputIterator.next(reuse) != null;
+
@Override
- public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
- if (inputIterator.next(reuse) != null) {
+ public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (hasData) {
final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), mock(BufferRecycler.class));
serializer.setNextBuffer(buffer);
serializer.addRecord(reuse);
- inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+ hasData = inputIterator.next(reuse) != null;
// Call getCurrentBuffer to ensure size is set
- return serializer.getCurrentBuffer();
- }
- else {
-
+ return new InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true);
+ } else {
when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
- return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+ return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false);
}
}
};
@@ -93,8 +94,8 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
return this;
}
- public IteratorWrappingTestSingleInputGate<T> read() {
- inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+ public IteratorWrappingTestSingleInputGate<T> notifyNonEmpty() {
+ inputGate.notifyChannelNonEmpty(inputChannel.getInputChannel());
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 4ca1d1f..0b72f95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -57,7 +58,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -93,11 +93,11 @@ public class LocalInputChannelTest {
final ExecutorService executor = Executors.newFixedThreadPool(2 * parallelism);
final NetworkBufferPool networkBuffers = new NetworkBufferPool(
- (parallelism * producerBufferPoolSize) + (parallelism * parallelism),
- TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP);
+ (parallelism * producerBufferPoolSize) + (parallelism * parallelism),
+ TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP);
final ResultPartitionConsumableNotifier partitionConsumableNotifier =
- mock(ResultPartitionConsumableNotifier.class);
+ mock(ResultPartitionConsumableNotifier.class);
final TaskActions taskActions = mock(TaskActions.class);
@@ -124,21 +124,20 @@ public class LocalInputChannelTest {
partitionManager,
partitionConsumableNotifier,
ioManager,
- ASYNC,
true);
// Create a buffer pool for this partition
partition.registerBufferPool(
- networkBuffers.createBufferPool(producerBufferPoolSize, true));
+ networkBuffers.createBufferPool(producerBufferPoolSize, true));
// Create the producer
partitionProducers[i] = new TestPartitionProducer(
- partition,
- false,
- new TestPartitionProducerBufferSource(
- parallelism,
- partition.getBufferProvider(),
- numberOfBuffersPerChannel)
+ partition,
+ false,
+ new TestPartitionProducerBufferSource(
+ parallelism,
+ partition.getBufferProvider(),
+ numberOfBuffersPerChannel)
);
// Register with the partition manager in order to allow the local input channels to
@@ -150,7 +149,7 @@ public class LocalInputChannelTest {
try {
// Submit producer tasks
List<Future<?>> results = Lists.newArrayListWithCapacity(
- parallelism + 1);
+ parallelism + 1);
for (int i = 0; i < parallelism; i++) {
results.add(executor.submit(partitionProducers[i]));
@@ -159,14 +158,14 @@ public class LocalInputChannelTest {
// Submit consumer
for (int i = 0; i < parallelism; i++) {
results.add(executor.submit(
- new TestLocalInputChannelConsumer(
- i,
- parallelism,
- numberOfBuffersPerChannel,
- networkBuffers.createBufferPool(parallelism, true),
- partitionManager,
- new TaskEventDispatcher(),
- partitionIds)));
+ new TestLocalInputChannelConsumer(
+ i,
+ parallelism,
+ numberOfBuffersPerChannel,
+ networkBuffers.createBufferPool(parallelism, true),
+ partitionManager,
+ new TaskEventDispatcher(),
+ partitionIds)));
}
// Wait for all to finish
@@ -183,7 +182,7 @@ public class LocalInputChannelTest {
@Test
public void testPartitionRequestExponentialBackoff() throws Exception {
// Config
- Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
+ Tuple2<Integer, Integer> backoff = new Tuple2<>(500, 3000);
// Start with initial backoff, then keep doubling, and cap at max.
int[] expectedDelays = {backoff._1(), 1000, 2000, backoff._2()};
@@ -199,7 +198,7 @@ public class LocalInputChannelTest {
LocalInputChannel ch = createLocalInputChannel(inputGate, partitionManager, backoff);
when(partitionManager
- .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider)))
+ .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class)))
.thenThrow(new PartitionNotFoundException(ch.partitionId));
Timer timer = mock(Timer.class);
@@ -215,7 +214,7 @@ public class LocalInputChannelTest {
// Initial request
ch.requestSubpartition(0);
verify(partitionManager)
- .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider));
+ .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class));
// Request subpartition and verify that the actual requests are delayed.
for (long expected : expectedDelays) {
@@ -236,14 +235,13 @@ public class LocalInputChannelTest {
@Test(expected = CancelTaskException.class)
public void testProducerFailedException() throws Exception {
-
ResultSubpartitionView view = mock(ResultSubpartitionView.class);
when(view.isReleased()).thenReturn(true);
when(view.getFailureCause()).thenReturn(new Exception("Expected test exception"));
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
when(partitionManager
- .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class)))
+ .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
.thenReturn(view);
SingleInputGate inputGate = mock(SingleInputGate.class);
@@ -251,7 +249,7 @@ public class LocalInputChannelTest {
when(inputGate.getBufferProvider()).thenReturn(bufferProvider);
LocalInputChannel ch = createLocalInputChannel(
- inputGate, partitionManager, new Tuple2<Integer, Integer>(0, 0));
+ inputGate, partitionManager, new Tuple2<>(0, 0));
ch.requestSubpartition(0);
@@ -268,14 +266,14 @@ public class LocalInputChannelTest {
throws IOException, InterruptedException {
return new LocalInputChannel(
- inputGate,
- 0,
- new ResultPartitionID(),
- partitionManager,
- mock(TaskEventDispatcher.class),
- initialAndMaxRequestBackoff._1(),
- initialAndMaxRequestBackoff._2(),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ inputGate,
+ 0,
+ new ResultPartitionID(),
+ partitionManager,
+ mock(TaskEventDispatcher.class),
+ initialAndMaxRequestBackoff._1(),
+ initialAndMaxRequestBackoff._2(),
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
/**
@@ -344,14 +342,14 @@ public class LocalInputChannelTest {
checkArgument(numberOfExpectedBuffersPerChannel >= 1);
this.inputGate = new SingleInputGate(
- "Test Name",
- new JobID(),
- new ExecutionAttemptID(),
- new IntermediateDataSetID(),
- subpartitionIndex,
- numberOfInputChannels,
- mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ "Test Name",
+ new JobID(),
+ new ExecutionAttemptID(),
+ new IntermediateDataSetID(),
+ subpartitionIndex,
+ numberOfInputChannels,
+ mock(TaskActions.class),
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
// Set buffer pool
inputGate.setBufferPool(bufferPool);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 2c2f966..1d30a9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -70,7 +70,7 @@ public class RemoteInputChannelTest {
// Need to notify the input gate for the out-of-order buffer as well. Otherwise the
// receiving task will not notice the error.
- verify(inputGate, times(2)).onAvailableBuffer(eq(inputChannel));
+ verify(inputGate, times(2)).notifyChannelNonEmpty(eq(inputChannel));
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 0b7b10d..7cae362 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
@@ -72,18 +73,18 @@ public class SingleInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final SingleInputGate inputGate = new SingleInputGate(
- "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
final TestInputChannel[] inputChannels = new TestInputChannel[]{
- new TestInputChannel(inputGate, 0),
- new TestInputChannel(inputGate, 1)
+ new TestInputChannel(inputGate, 0),
+ new TestInputChannel(inputGate, 1)
};
inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
+ new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
+ new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
// Test
inputChannels[0].readBuffer();
@@ -92,9 +93,12 @@ public class SingleInputGateTest {
inputChannels[1].readEndOfPartitionEvent();
inputChannels[0].readEndOfPartitionEvent();
- verifyBufferOrEvent(inputGate, true, 0);
+ inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannels[1].getInputChannel());
+
verifyBufferOrEvent(inputGate, true, 0);
verifyBufferOrEvent(inputGate, true, 1);
+ verifyBufferOrEvent(inputGate, true, 0);
verifyBufferOrEvent(inputGate, false, 1);
verifyBufferOrEvent(inputGate, false, 0);
@@ -111,10 +115,14 @@ public class SingleInputGateTest {
final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
when(iterator.getNextBuffer()).thenReturn(
- new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)));
+ new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)));
final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
- when(partitionManager.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class))).thenReturn(iterator);
+ when(partitionManager.createSubpartitionView(
+ any(ResultPartitionID.class),
+ anyInt(),
+ any(BufferProvider.class),
+ any(BufferAvailabilityListener.class))).thenReturn(iterator);
// Setup reader with one local and one unknown input channel
final IntermediateDataSetID resultId = new IntermediateDataSetID();
@@ -143,7 +151,7 @@ public class SingleInputGateTest {
inputGate.requestPartitions();
// Only the local channel can request
- verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
+ verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
// Send event backwards and initialize unknown channel afterwards
final TaskEvent event = new TestTaskEvent();
@@ -155,7 +163,7 @@ public class SingleInputGateTest {
// After the update, the pending event should be send to local channel
inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
- verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
+ verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
}
@@ -174,8 +182,7 @@ public class SingleInputGateTest {
new IntermediateDataSetID(),
0,
1,
- mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
@@ -186,19 +193,17 @@ public class SingleInputGateTest {
partitionManager,
new TaskEventDispatcher(),
new LocalConnectionManager(),
- 0,
- 0,
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
// Update to a local channel and verify that no request is triggered
inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
- unknown.partitionId,
- ResultPartitionLocation.createLocal()));
+ unknown.partitionId,
+ ResultPartitionLocation.createLocal()));
verify(partitionManager, never()).createSubpartitionView(
- any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
+ any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
}
/**
@@ -227,8 +232,7 @@ public class SingleInputGateTest {
new ResultPartitionManager(),
new TaskEventDispatcher(),
new LocalConnectionManager(),
- 0,
- 0,
+ 0, 0,
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
@@ -249,16 +253,15 @@ public class SingleInputGateTest {
// Wait for blocking queue poll call and release input gate
boolean success = false;
for (int i = 0; i < 50; i++) {
- if (asyncConsumer != null && asyncConsumer.isAlive()) {
- StackTraceElement[] stackTrace = asyncConsumer.getStackTrace();
- success = isInBlockingQueuePoll(stackTrace);
+ if (asyncConsumer.isAlive()) {
+ success = asyncConsumer.getState() == Thread.State.WAITING;
}
if (success) {
break;
} else {
// Retry
- Thread.sleep(500);
+ Thread.sleep(100);
}
}
@@ -355,33 +358,12 @@ public class SingleInputGateTest {
}
}
- /**
- * Returns whether the stack trace represents a Thread in a blocking queue
- * poll call.
- *
- * @param stackTrace Stack trace of the Thread to check
- *
- * @return Flag indicating whether the Thread is in a blocking queue poll
- * call.
- */
- private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
- for (StackTraceElement elem : stackTrace) {
- if (elem.getMethodName().equals("poll") &&
- elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
-
- return true;
- }
- }
-
- return false;
- }
-
// ---------------------------------------------------------------------------------------------
static void verifyBufferOrEvent(
- InputGate inputGate,
- boolean isBuffer,
- int channelIndex) throws IOException, InterruptedException {
+ InputGate inputGate,
+ boolean isBuffer,
+ int channelIndex) throws IOException, InterruptedException {
final BufferOrEvent boe = inputGate.getNextBufferOrEvent();
assertEquals(isBuffer, boe.isBuffer());
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 7ea67b3..a6597a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -19,10 +19,8 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -46,7 +44,7 @@ public class TestInputChannel {
private final SingleInputGate inputGate;
// Abusing Mockito here... ;)
- protected OngoingStubbing<Buffer> stubbing;
+ protected OngoingStubbing<InputChannel.BufferAndAvailability> stubbing;
public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
checkArgument(channelIndex >= 0);
@@ -57,13 +55,10 @@ public class TestInputChannel {
public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
if (stubbing == null) {
- stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
+ stubbing = when(mock.getNextBuffer()).thenReturn(new InputChannel.BufferAndAvailability(buffer, true));
+ } else {
+ stubbing = stubbing.thenReturn(new InputChannel.BufferAndAvailability(buffer, true));
}
- else {
- stubbing = stubbing.thenReturn(buffer);
- }
-
- inputGate.onAvailableBuffer(mock);
return this;
}
@@ -75,34 +70,23 @@ public class TestInputChannel {
return read(buffer);
}
- public TestInputChannel readEvent() throws IOException, InterruptedException {
- return read(EventSerializer.toBuffer(new TestTaskEvent()));
- }
-
- public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException {
- return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
- }
-
public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
- final Answer<Buffer> answer = new Answer<Buffer>() {
+ final Answer<InputChannel.BufferAndAvailability> answer = new Answer<InputChannel.BufferAndAvailability>() {
@Override
- public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
// Return true after finishing
when(mock.isReleased()).thenReturn(true);
- return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+ return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false);
}
};
if (stubbing == null) {
stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
- }
- else {
+ } else {
stubbing = stubbing.thenAnswer(answer);
}
- inputGate.onAvailableBuffer(mock);
-
return this;
}