You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:22 UTC
[flink] 05/09: [hotfix][network] adapt InputGateConcurrentTest to
really follow our guarantees
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b75e8651f7a818fa1b8bca3b60eeebe93a1b792d
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:25:24 2018 +0200
[hotfix][network] adapt InputGateConcurrentTest to really follow our guarantees
- producers should flush after writing to make sure all data has been sent
- we can only check bufferConsumer.isFinished() after building a Buffer
- producer/consumer threads should be named
---
.../network/partition/InputGateConcurrentTest.java | 22 ++++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 5f5728d..5c643af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -198,6 +199,8 @@ public class InputGateConcurrentTest {
private abstract static class Source {
abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;
+
+ abstract void flush();
}
private static class PipelinedSubpartitionSource extends Source {
@@ -212,6 +215,11 @@ public class InputGateConcurrentTest {
void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
partition.add(bufferConsumer);
}
+
+ @Override
+ void flush() {
+ partition.flush();
+ }
}
private static class RemoteChannelSource extends Source {
@@ -225,14 +233,19 @@ public class InputGateConcurrentTest {
@Override
void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
- checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
try {
- channel.onBuffer(bufferConsumer.build(), seq++, -1);
+ Buffer buffer = bufferConsumer.build();
+ checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
+ channel.onBuffer(buffer, seq++, -1);
}
finally {
bufferConsumer.close();
}
}
+
+ @Override
+ void flush() {
+ }
}
// ------------------------------------------------------------------------
@@ -248,6 +261,7 @@ public class InputGateConcurrentTest {
private final int yieldAfter;
ProducerThread(Source[] sources, int numTotal, int maxChunk, int yieldAfter) {
+ super("producer");
this.sources = sources;
this.numTotal = numTotal;
this.maxChunk = maxChunk;
@@ -276,7 +290,10 @@ public class InputGateConcurrentTest {
//noinspection CallToThreadYield
Thread.yield();
}
+ }
+ for (Source source : sources) {
+ source.flush();
}
}
}
@@ -287,6 +304,7 @@ public class InputGateConcurrentTest {
private final int numBuffers;
ConsumerThread(SingleInputGate gate, int numBuffers) {
+ super("consumer");
this.gate = gate;
this.numBuffers = numBuffers;
}