You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/12 07:35:13 UTC
[5/5] flink git commit: [FLINK-9087] [runtime] change the method
signature of RecordWriter.broadcastEvent() from BufferConsumer to void
[FLINK-9087] [runtime] change the method signature of RecordWriter.broadcastEvent() from BufferConsumer to void
This closes #5802.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a5a64ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a5a64ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a5a64ab
Branch: refs/heads/master
Commit: 0a5a64ab746107c36a207a7cc62a75a88eabf1ae
Parents: 4742c34
Author: triones.deng <tr...@vipshop.com>
Authored: Tue Apr 3 10:28:52 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 11 14:21:36 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/io/network/api/writer/RecordWriter.java | 3 +--
.../runtime/io/network/api/writer/RecordWriterTest.java | 9 +++++++--
2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0a5a64ab/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index c35c7f3..e3a8e49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -150,7 +150,7 @@ public class RecordWriter<T extends IOReadableWritable> {
}
}
- public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException {
+ public void broadcastEvent(AbstractEvent event) throws IOException {
try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) {
for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
RecordSerializer<T> serializer = serializers[targetChannel];
@@ -164,7 +164,6 @@ public class RecordWriter<T extends IOReadableWritable> {
if (flushAlways) {
flushAll();
}
- return eventBufferConsumer;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a5a64ab/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ec0dfe2..0b0a236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -299,18 +299,23 @@ public class RecordWriterTest {
new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
RecordWriter<?> writer = new RecordWriter<>(partition);
- BufferConsumer bufferConsumer = writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
+ writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
// Verify added to all queues
assertEquals(1, queues[0].size());
assertEquals(1, queues[1].size());
+ // get references to buffer consumers (copies from the original event buffer consumer)
+ BufferConsumer bufferConsumer1 = queues[0].getFirst();
+ BufferConsumer bufferConsumer2 = queues[1].getFirst();
+
// process all collected events (recycles the buffer)
for (int i = 0; i < queues.length; i++) {
assertTrue(parseBuffer(queues[i].remove(), i).isEvent());
}
- assertTrue(bufferConsumer.isRecycled());
+ assertTrue(bufferConsumer1.isRecycled());
+ assertTrue(bufferConsumer2.isRecycled());
}
/**