You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 19:32:50 UTC

[flink] 05/11: [hotfix][network] adapt InputGateConcurrentTest to really follow our guarantees

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d2b74bf502cac3274cd6f2dc72db56c056ff92b
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:25:24 2018 +0200

    [hotfix][network] adapt InputGateConcurrentTest to really follow our guarantees
    
    - producers should flush after writing to make sure all data has been sent
    - we can only check bufferConsumer.isFinished() after building a Buffer
    - producer/consumer threads should be named
---
 .../network/partition/InputGateConcurrentTest.java | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 5f5728d..5c643af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -198,6 +199,8 @@ public class InputGateConcurrentTest {
 	private abstract static class Source {
 
 		abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;
+
+		abstract void flush();
 	}
 
 	private static class PipelinedSubpartitionSource extends Source {
@@ -212,6 +215,11 @@ public class InputGateConcurrentTest {
 		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
 			partition.add(bufferConsumer);
 		}
+
+		@Override
+		void flush() {
+			partition.flush();
+		}
 	}
 
 	private static class RemoteChannelSource extends Source {
@@ -225,14 +233,19 @@ public class InputGateConcurrentTest {
 
 		@Override
 		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
-			checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
 			try {
-				channel.onBuffer(bufferConsumer.build(), seq++, -1);
+				Buffer buffer = bufferConsumer.build();
+				checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
+				channel.onBuffer(buffer, seq++, -1);
 			}
 			finally {
 				bufferConsumer.close();
 			}
 		}
+
+		@Override
+		void flush() {
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -248,6 +261,7 @@ public class InputGateConcurrentTest {
 		private final int yieldAfter;
 
 		ProducerThread(Source[] sources, int numTotal, int maxChunk, int yieldAfter) {
+			super("producer");
 			this.sources = sources;
 			this.numTotal = numTotal;
 			this.maxChunk = maxChunk;
@@ -276,7 +290,10 @@ public class InputGateConcurrentTest {
 					//noinspection CallToThreadYield
 					Thread.yield();
 				}
+			}
 
+			for (Source source : sources) {
+				source.flush();
 			}
 		}
 	}
@@ -287,6 +304,7 @@ public class InputGateConcurrentTest {
 		private final int numBuffers;
 
 		ConsumerThread(SingleInputGate gate, int numBuffers) {
+			super("consumer");
 			this.gate = gate;
 			this.numBuffers = numBuffers;
 		}