You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/12 07:35:13 UTC

[5/5] flink git commit: [FLINK-9087] [runtime] change the method signature of RecordWriter.broadcastEvent() from BufferConsumer to void

[FLINK-9087] [runtime] change the method signature of RecordWriter.broadcastEvent() from BufferConsumer to void

This closes #5802.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a5a64ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a5a64ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a5a64ab

Branch: refs/heads/master
Commit: 0a5a64ab746107c36a207a7cc62a75a88eabf1ae
Parents: 4742c34
Author: triones.deng <tr...@vipshop.com>
Authored: Tue Apr 3 10:28:52 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 11 14:21:36 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/api/writer/RecordWriter.java   | 3 +--
 .../runtime/io/network/api/writer/RecordWriterTest.java     | 9 +++++++--
 2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a5a64ab/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index c35c7f3..e3a8e49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -150,7 +150,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 		}
 	}
 
-	public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException {
+	public void broadcastEvent(AbstractEvent event) throws IOException {
 		try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) {
 			for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
 				RecordSerializer<T> serializer = serializers[targetChannel];
@@ -164,7 +164,6 @@ public class RecordWriter<T extends IOReadableWritable> {
 			if (flushAlways) {
 				flushAll();
 			}
-			return eventBufferConsumer;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a5a64ab/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ec0dfe2..0b0a236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -299,18 +299,23 @@ public class RecordWriterTest {
 			new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
 		RecordWriter<?> writer = new RecordWriter<>(partition);
 
-		BufferConsumer bufferConsumer = writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
+		writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
 
 		// Verify added to all queues
 		assertEquals(1, queues[0].size());
 		assertEquals(1, queues[1].size());
 
+		// get references to buffer consumers (copies from the original event buffer consumer)
+		BufferConsumer bufferConsumer1 = queues[0].getFirst();
+		BufferConsumer bufferConsumer2 = queues[1].getFirst();
+
 		// process all collected events (recycles the buffer)
 		for (int i = 0; i < queues.length; i++) {
 			assertTrue(parseBuffer(queues[i].remove(), i).isEvent());
 		}
 
-		assertTrue(bufferConsumer.isRecycled());
+		assertTrue(bufferConsumer1.isRecycled());
+		assertTrue(bufferConsumer2.isRecycled());
 	}
 
 	/**