You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/09 15:24:43 UTC
[1/4] flink git commit: [FLINK-5066] [network] Add
EventSerializer.isEvent
Repository: flink
Updated Branches:
refs/heads/master 384da0e99 -> 9cff8c92b
[FLINK-5066] [network] Add EventSerializer.isEvent
This allows PartitionRequestQueue to peak into event buffers instead of de-serializing the full event class.
This closes #2806.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94574652
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94574652
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94574652
Branch: refs/heads/master
Commit: 945746527c554bad5ba30fc953c33b35cf78b816
Parents: 4a6b0fa
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Dec 8 14:17:34 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:22:41 2017 +0100
----------------------------------------------------------------------
.../api/serialization/EventSerializer.java | 80 ++++++++++++++++++++
.../io/network/netty/PartitionRequestQueue.java | 3 +-
.../api/serialization/EventSerializerTest.java | 80 ++++++++++++++++++++
3 files changed, 162 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/94574652/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 0bc3b28..4d9f431 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -91,6 +91,70 @@ public class EventSerializer {
}
}
+ /**
+ * Identifies whether the given buffer encodes the given event.
+ *
+ * <p><strong>Pre-condition</strong>: This buffer must encode some event!</p>
+ *
+ * @param buffer the buffer to peak into
+ * @param eventClass the expected class of the event type
+ * @param classLoader the class loader to use for custom event classes
+ * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
+ * @throws IOException
+ */
+ private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoader classLoader) throws IOException {
+ if (buffer.remaining() < 4) {
+ throw new IOException("Incomplete event");
+ }
+
+ final int bufferPos = buffer.position();
+ final ByteOrder bufferOrder = buffer.order();
+ buffer.order(ByteOrder.BIG_ENDIAN);
+
+ try {
+ int type = buffer.getInt();
+
+ switch (type) {
+ case END_OF_PARTITION_EVENT:
+ return eventClass.equals(EndOfPartitionEvent.class);
+ case CHECKPOINT_BARRIER_EVENT:
+ return eventClass.equals(CheckpointBarrier.class);
+ case END_OF_SUPERSTEP_EVENT:
+ return eventClass.equals(EndOfSuperstepEvent.class);
+ case CANCEL_CHECKPOINT_MARKER_EVENT:
+ return eventClass.equals(CancelCheckpointMarker.class);
+ case OTHER_EVENT:
+ try {
+ final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
+ final String className = deserializer.readUTF();
+
+ final Class<? extends AbstractEvent> clazz;
+ try {
+ clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
+ }
+ catch (ClassNotFoundException e) {
+ throw new IOException("Could not load event class '" + className + "'.", e);
+ }
+ catch (ClassCastException e) {
+ throw new IOException("The class '" + className + "' is not a valid subclass of '"
+ + AbstractEvent.class.getName() + "'.", e);
+ }
+ return eventClass.equals(clazz);
+ }
+ catch (Exception e) {
+ throw new IOException("Error while deserializing or instantiating event.", e);
+ }
+ default:
+ throw new IOException("Corrupt byte stream for event");
+ }
+ }
+ finally {
+ buffer.order(bufferOrder);
+ // restore the original position in the buffer (recall: we only peak into it!)
+ buffer.position(bufferPos);
+ }
+ }
+
public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {
if (buffer.remaining() < 4) {
throw new IOException("Incomplete event");
@@ -170,4 +234,20 @@ public class EventSerializer {
public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
}
+
+ /**
+ * Identifies whether the given buffer encodes the given event.
+ *
+ * @param buffer the buffer to peak into
+ * @param eventClass the expected class of the event type
+ * @param classLoader the class loader to use for custom event classes
+ * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
+ * @throws IOException
+ */
+ public static boolean isEvent(final Buffer buffer,
+ final Class<?> eventClass,
+ final ClassLoader classLoader) throws IOException {
+ return !buffer.isBuffer() &&
+ isEvent(buffer.getNioBuffer(), eventClass, classLoader);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/94574652/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 297911a..5330dac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -218,7 +218,8 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
}
private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException {
- return !buffer.isBuffer() && EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class;
+ return EventSerializer.isEvent(buffer, EndOfPartitionEvent.class,
+ getClass().getClassLoader());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/94574652/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index d47a0b5..271d0d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -23,13 +23,16 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.junit.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -55,4 +58,81 @@ public class EventSerializerTest {
assertEquals(evt, deserialized);
}
}
+
+ /**
+ * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)}
+ * whether it peaks into the buffer only, i.e. after the call, the buffer
+ * is still de-serializable.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testIsEventPeakOnly() throws Exception {
+ final Buffer serializedEvent =
+ EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+ try {
+ final ClassLoader cl = getClass().getClassLoader();
+ assertTrue(
+ EventSerializer
+ .isEvent(serializedEvent, EndOfPartitionEvent.class, cl));
+ EndOfPartitionEvent event = (EndOfPartitionEvent) EventSerializer
+ .fromBuffer(serializedEvent, cl);
+ assertEquals(EndOfPartitionEvent.INSTANCE, event);
+ } finally {
+ serializedEvent.recycle();
+ }
+ }
+
+ /**
+ * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns
+ * the correct answer for various encoded event buffers.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testIsEvent() throws Exception {
+ AbstractEvent[] events = {
+ EndOfPartitionEvent.INSTANCE,
+ EndOfSuperstepEvent.INSTANCE,
+ new CheckpointBarrier(1678L, 4623784L),
+ new TestTaskEvent(Math.random(), 12361231273L),
+ new CancelCheckpointMarker(287087987329842L)
+ };
+
+ for (AbstractEvent evt : events) {
+ for (AbstractEvent evt2 : events) {
+ if (evt == evt2) {
+ assertTrue(checkIsEvent(evt, evt2.getClass()));
+ } else {
+ assertFalse(checkIsEvent(evt, evt2.getClass()));
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the result of
+ * {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} on a buffer
+ * that encodes the given <tt>event</tt>.
+ *
+ * @param event the event to encode
+ * @param eventClass the event class to check against
+ *
+ * @return whether {@link EventSerializer#isEvent(ByteBuffer, Class, ClassLoader)}
+ * thinks the encoded buffer matches the class
+ * @throws IOException
+ */
+ private final boolean checkIsEvent(final AbstractEvent event,
+ final Class<? extends AbstractEvent> eventClass) throws
+ IOException {
+ final Buffer serializedEvent =
+ EventSerializer.toBuffer(event);
+ try {
+ final ClassLoader cl = getClass().getClassLoader();
+ return EventSerializer
+ .isEvent(serializedEvent, eventClass, cl);
+ } finally {
+ serializedEvent.recycle();
+ }
+ }
}
[4/4] flink git commit: [FLINK-5059] [network] Only serialise events
once during RecordWriter#broadcastEvent
Posted by uc...@apache.org.
[FLINK-5059] [network] Only serialise events once during RecordWriter#broadcastEvent
This closes #2805.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9cff8c92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9cff8c92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9cff8c92
Branch: refs/heads/master
Commit: 9cff8c92bf4ec0d6ada7ef2595679d5bb6daf522
Parents: 9457465
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Nov 14 16:16:30 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:23:50 2017 +0100
----------------------------------------------------------------------
.../io/network/api/writer/RecordWriter.java | 44 +++++-----
.../api/writer/ResultPartitionWriter.java | 38 +++++----
.../iterative/task/IterationHeadTask.java | 7 +-
.../task/IterationIntermediateTask.java | 2 +-
.../io/network/api/writer/RecordWriterTest.java | 64 ++++++++++----
.../api/writer/ResultPartitionWriterTest.java | 87 ++++++++++++++++++++
.../streaming/runtime/tasks/StreamTask.java | 3 +-
.../runtime/tasks/StreamMockEnvironment.java | 76 ++++++++++-------
8 files changed, 229 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index dfa2f3d..9283b70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -131,35 +132,30 @@ public class RecordWriter<T extends IOReadableWritable> {
}
public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
- for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
- RecordSerializer<T> serializer = serializers[targetChannel];
-
- synchronized (serializer) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- writeAndClearBuffer(buffer, targetChannel, serializer);
- } else if (serializer.hasData()) {
- throw new IllegalStateException("No buffer, but serializer has buffered data.");
- }
-
- targetPartition.writeEvent(event, targetChannel);
- }
- }
- }
+ final Buffer eventBuffer = EventSerializer.toBuffer(event);
+ try {
+ for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
+ RecordSerializer<T> serializer = serializers[targetChannel];
- public void sendEndOfSuperstep() throws IOException, InterruptedException {
- for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
- RecordSerializer<T> serializer = serializers[targetChannel];
+ synchronized (serializer) {
+ Buffer buffer = serializer.getCurrentBuffer();
+ if (buffer != null) {
+ writeAndClearBuffer(buffer, targetChannel, serializer);
+ } else if (serializer.hasData()) {
+ // sanity check
+ throw new IllegalStateException("No buffer, but serializer has buffered data.");
+ }
- synchronized (serializer) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- writeAndClearBuffer(buffer, targetChannel, serializer);
+ // retain the buffer so that it can be recycled by each channel of targetPartition
+ eventBuffer.retain();
+ targetPartition.writeBuffer(eventBuffer, targetChannel);
}
}
+ } finally {
+ // we do not need to further retain the eventBuffer
+ // (it will be recycled after the last channel stops using it)
+ eventBuffer.recycle();
}
-
- targetPartition.writeEndOfSuperstep();
}
public void flush() throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index cfab34d..4ca7616 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,11 +18,8 @@
package org.apache.flink.runtime.io.network.api.writer;
-import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -71,21 +68,26 @@ public class ResultPartitionWriter implements EventListener<TaskEvent> {
partition.add(buffer, targetChannel);
}
- public void writeEvent(AbstractEvent event, int targetChannel) throws IOException {
- partition.add(EventSerializer.toBuffer(event), targetChannel);
- }
-
- public void writeEventToAllChannels(AbstractEvent event) throws IOException {
- for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
- Buffer buffer = EventSerializer.toBuffer(event);
- partition.add(buffer, i);
- }
- }
-
- public void writeEndOfSuperstep() throws IOException {
- for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
- Buffer buffer = EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE);
- partition.add(buffer, i);
+ /**
+ * Writes the given buffer to all available target channels.
+ *
+ * The buffer is taken over and used for each of the channels.
+ * It will be recycled afterwards.
+ *
+ * @param eventBuffer the buffer to write
+ * @throws IOException
+ */
+ public void writeBufferToAllChannels(final Buffer eventBuffer) throws IOException {
+ try {
+ for (int targetChannel = 0; targetChannel < partition.getNumberOfSubpartitions(); targetChannel++) {
+ // retain the buffer so that it can be recycled by each channel of targetPartition
+ eventBuffer.retain();
+ writeBuffer(eventBuffer, targetChannel);
+ }
+ } finally {
+ // we do not need to further retain the eventBuffer
+ // (it will be recycled after the last channel stops using it)
+ eventBuffer.recycle();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 2e3285c..575072d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.operators.Driver;
@@ -426,7 +428,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
}
for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
- eventualOutput.sendEndOfSuperstep();
+ eventualOutput.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
}
}
@@ -434,6 +436,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
if (log.isInfoEnabled()) {
log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
}
- this.toSync.writeEventToAllChannels(event);
+
+ this.toSync.writeBufferToAllChannels(EventSerializer.toBuffer(event));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
index 41f8e31..16a7008 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -124,7 +124,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
private void sendEndOfSuperstep() throws IOException, InterruptedException {
for (RecordWriter eventualOutput : this.eventualOutputs) {
- eventualOutput.sendEndOfSuperstep();
+ eventualOutput.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 43a93c6..63175ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -26,6 +26,9 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,6 +36,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.testutils.DiscardingRecycler;
import org.apache.flink.types.IntValue;
@@ -42,6 +46,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -68,7 +73,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@PrepareForTest(ResultPartitionWriter.class)
+@PrepareForTest({ResultPartitionWriter.class, EventSerializer.class})
@RunWith(PowerMockRunner.class)
public class RecordWriterTest {
@@ -242,7 +247,7 @@ public class RecordWriterTest {
try {
// Verify that end of super step correctly clears the buffer.
recordWriter.emit(new IntValue(0));
- recordWriter.sendEndOfSuperstep();
+ recordWriter.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
Assert.fail("Did not throw expected test Exception");
}
@@ -395,6 +400,39 @@ public class RecordWriterTest {
assertEquals(1, queues[3].size()); // 0 buffers + 1 event
}
+ /**
+ * Tests that event buffers are properly recycled when broadcasting events
+ * to multiple channels.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBroadcastEventBufferReferenceCounting() throws Exception {
+ Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+ // Partial mocking of static method...
+ PowerMockito
+ .stub(PowerMockito.method(EventSerializer.class, "toBuffer"))
+ .toReturn(buffer);
+
+ @SuppressWarnings("unchecked")
+ ArrayDeque<BufferOrEvent>[] queues =
+ new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
+
+ ResultPartitionWriter partition =
+ createCollectingPartitionWriter(queues,
+ new TestInfiniteBufferProvider());
+ RecordWriter<?> writer = new RecordWriter<>(partition);
+
+ writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
+
+ // Verify added to all queues
+ assertEquals(1, queues[0].size());
+ assertEquals(1, queues[1].size());
+
+ assertTrue(buffer.isRecycled());
+ }
+
// ---------------------------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------------------------
@@ -421,22 +459,20 @@ public class RecordWriterTest {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];
- Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
- queues[targetChannel].add(new BufferOrEvent(buffer, targetChannel));
+ if (buffer.isBuffer()) {
+ Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
+ queues[targetChannel].add(new BufferOrEvent(buffer, targetChannel));
+ } else {
+ // is event:
+ AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+ buffer.recycle(); // the buffer is not needed anymore
+ Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
+ queues[targetChannel].add(new BufferOrEvent(event, targetChannel));
+ }
return null;
}
}).when(partitionWriter).writeBuffer(any(Buffer.class), anyInt());
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
- Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
- queues[targetChannel].add(new BufferOrEvent(event, targetChannel));
- return null;
- }
- }).when(partitionWriter).writeEvent(any(AbstractEvent.class), anyInt());
-
return partitionWriter;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
new file mode 100644
index 0000000..104bc87
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayDeque;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+@PrepareForTest({ResultPartitionWriter.class})
+@RunWith(PowerMockRunner.class)
+public class ResultPartitionWriterTest {
+
+ // ---------------------------------------------------------------------------------------------
+ // Resource release tests
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Tests that event buffers are properly recycled when broadcasting events
+ * to multiple channels.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWriteBufferToAllChannelsReferenceCounting() throws Exception {
+ Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+ ResultPartition partition = new ResultPartition(
+ "TestTask",
+ mock(TaskActions.class),
+ new JobID(),
+ new ResultPartitionID(),
+ ResultPartitionType.PIPELINED,
+ 2,
+ mock(ResultPartitionManager.class),
+ mock(ResultPartitionConsumableNotifier.class),
+ mock(IOManager.class),
+ false);
+ ResultPartitionWriter partitionWriter =
+ new ResultPartitionWriter(
+ partition);
+
+ partitionWriter.writeBufferToAllChannels(buffer);
+
+ // Verify added to all queues, i.e. two buffers in total
+ assertEquals(2, partition.getTotalNumberOfBuffers());
+ // release the buffers in the partition
+ partition.release();
+
+ assertTrue(buffer.isRecycled());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index bd9215a..675c606 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
@@ -591,7 +592,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// yet be created
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
- output.writeEventToAllChannels(message);
+ output.writeBufferToAllChannels(EventSerializer.toBuffer(message));
}
return false;
http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index b9a747c..58912ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -160,54 +161,27 @@ public class StreamMockEnvironment implements Environment {
final RecordDeserializer<DeserializationDelegate<T>> recordDeserializer = new AdaptiveSpanningRecordDeserializer<DeserializationDelegate<T>>();
final NonReusingDeserializationDelegate<T> delegate = new NonReusingDeserializationDelegate<T>(serializer);
- // Add records from the buffer to the output list
+ // Add records and events from the buffer to the output list
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];
-
- recordDeserializer.setNextBuffer(buffer);
-
- while (recordDeserializer.hasUnfinishedData()) {
- RecordDeserializer.DeserializationResult result = recordDeserializer.getNextRecord(delegate);
-
- if (result.isFullRecord()) {
- outputList.add(delegate.getInstance());
- }
-
- if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
- || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
- break;
- }
- }
-
+ addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
return null;
}
}).when(mockWriter).writeBuffer(any(Buffer.class), anyInt());
- // Add events to the output list
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
-
- outputList.add(event);
+ Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];
+ addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
return null;
}
- }).when(mockWriter).writeEvent(any(AbstractEvent.class), anyInt());
-
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
+ }).when(mockWriter).writeBufferToAllChannels(any(Buffer.class));
- outputList.add(event);
- return null;
- }
- }).when(mockWriter).writeEventToAllChannels(any(AbstractEvent.class));
outputs.add(mockWriter);
}
@@ -217,6 +191,44 @@ public class StreamMockEnvironment implements Environment {
}
}
+ /**
+ * Adds the object behind the given <tt>buffer</tt> to the <tt>outputList</tt>.
+ *
+ * @param recordDeserializer de-serializer to use for the buffer
+ * @param delegate de-serialization delegate to use for non-event buffers
+ * @param buffer the buffer to add
+ * @param outputList the output list to add the object to
+ * @param <T> type of the objects behind the non-event buffers
+ *
+ * @throws java.io.IOException
+ */
+ private <T> void addBufferToOutputList(
+ RecordDeserializer<DeserializationDelegate<T>> recordDeserializer,
+ NonReusingDeserializationDelegate<T> delegate, Buffer buffer,
+ final Queue<Object> outputList) throws java.io.IOException {
+ if (buffer.isBuffer()) {
+ recordDeserializer.setNextBuffer(buffer);
+
+ while (recordDeserializer.hasUnfinishedData()) {
+ RecordDeserializer.DeserializationResult result =
+ recordDeserializer.getNextRecord(delegate);
+
+ if (result.isFullRecord()) {
+ outputList.add(delegate.getInstance());
+ }
+
+ if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
+ || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+ break;
+ }
+ }
+ } else {
+ // is event
+ AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+ outputList.add(event);
+ }
+ }
+
@Override
public Configuration getTaskConfiguration() {
return this.taskConfiguration;
[2/4] flink git commit: [network] Clear serializer only once in
RecordWriter#flush()
Posted by uc...@apache.org.
[network] Clear serializer only once in RecordWriter#flush()
This closes #2829.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a6b0fad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a6b0fad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a6b0fad
Branch: refs/heads/master
Commit: 4a6b0fad0909509cd1c37ed6e8d9dc7ec5d28d89
Parents: 161bab0
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 17 14:01:52 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:22:41 2017 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/io/network/api/writer/RecordWriter.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4a6b0fad/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 87d34ff..dfa2f3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -171,7 +171,7 @@ public class RecordWriter<T extends IOReadableWritable> {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
- writeAndClearBuffer(buffer, targetChannel, serializer);
+ targetPartition.writeBuffer(buffer, targetChannel);
}
} finally {
serializer.clear();
[3/4] flink git commit: [javadocs,
network] Add javadocs to SpanningRecordSerializer and RecordSerializer
Posted by uc...@apache.org.
[javadocs, network] Add javadocs to SpanningRecordSerializer and RecordSerializer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/161bab04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/161bab04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/161bab04
Branch: refs/heads/master
Commit: 161bab04965b0110bfa80dcfcd1963f12b3cfda3
Parents: 384da0e
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 17 13:57:18 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:22:41 2017 +0100
----------------------------------------------------------------------
.../api/serialization/RecordSerializer.java | 72 ++++++++++++++++++--
.../serialization/SpanningRecordSerializer.java | 16 +++++
.../io/network/api/writer/RecordWriter.java | 5 +-
3 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index e8179dc..5fe56c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -43,30 +43,90 @@ public interface RecordSerializer<T extends IOReadableWritable> {
this.isFullRecord = isFullRecord;
this.isFullBuffer = isFullBuffer;
}
-
+
+ /**
+ * Whether the full record was serialized and completely written to
+ * a target buffer.
+ *
+ * @return <tt>true</tt> if the complete record was written
+ */
public boolean isFullRecord() {
return this.isFullRecord;
}
-
+
+ /**
+ * Whether the target buffer is full after the serialization process.
+ *
+ * @return <tt>true</tt> if the target buffer is full
+ */
public boolean isFullBuffer() {
return this.isFullBuffer;
}
}
-
+
+ /**
+ * Starts serializing and copying the given record to the target buffer
+ * (if available).
+ *
+ * @param record the record to serialize
+ * @return how much information was written to the target buffer and
+ * whether this buffer is full
+ * @throws IOException
+ */
SerializationResult addRecord(T record) throws IOException;
+ /**
+ * Sets a (next) target buffer to use and continues writing remaining data
+ * to it until it is full.
+ *
+ * @param buffer the new target buffer to use
+ * @return how much information was written to the target buffer and
+ * whether this buffer is full
+ * @throws IOException
+ */
SerializationResult setNextBuffer(Buffer buffer) throws IOException;
+ /**
+ * Retrieves the current target buffer and sets its size to the actual
+ * number of written bytes.
+ *
+ * After calling this method, a new target buffer is required to continue
+ * writing (see {@link #setNextBuffer(Buffer)}).
+ *
+ * @return the target buffer that was used
+ */
Buffer getCurrentBuffer();
+ /**
+ * Resets the target buffer to <tt>null</tt>.
+ *
+ * <p><strong>NOTE:</strong> After calling this method, <strong>a new target
+ * buffer is required to continue writing</strong> (see
+ * {@link #setNextBuffer(Buffer)}).</p>
+ */
void clearCurrentBuffer();
-
+
+ /**
+ * Resets the target buffer to <tt>null</tt> and resets internal state set
+ * up for the record to serialize.
+ *
+ * <p><strong>NOTE:</strong> After calling this method, a <strong>new record
+ * and a new target buffer is required to start writing again</strong>
+ * (see {@link #setNextBuffer(Buffer)}). If you want to continue
+ * with the current record, use {@link #clearCurrentBuffer()} instead.</p>
+ */
void clear();
-
+
+ /**
+ * Determines whether data is left, either in the current target buffer or
+ * in any internal state set up for the record to serialize.
+ *
+ * @return <tt>true</tt> if some data is present
+ */
boolean hasData();
/**
- * Insantiates all metrics.
+ * Instantiates all metrics.
*
* @param metrics metric group
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index a8fe3fe..335d12e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -29,6 +29,13 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataOutputSerializer;
+/**
+ * Record serializer which serializes the complete record to an intermediate
+ * data serialization buffer and copies this buffer to target buffers
+ * one-by-one using {@link #setNextBuffer(Buffer)}.
+ *
+ * @param <T>
+ */
public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
/** Flag to enable/disable checks, if buffer not set/full or pending serialization */
@@ -65,6 +72,15 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
this.lengthBuffer.position(4);
}
+ /**
+ * Serializes the complete record to an intermediate data serialization
+ * buffer and starts copying it to the target buffer (if available).
+ *
+ * @param record the record to serialize
+ * @return how much information was written to the target buffer and
+ * whether this buffer is full
+ * @throws IOException
+ */
@Override
public SerializationResult addRecord(T record) throws IOException {
if (CHECKED) {
http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 799187d..87d34ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -208,9 +208,10 @@ public class RecordWriter<T extends IOReadableWritable> {
}
/**
- * Writes the buffer to the {@link ResultPartitionWriter}.
+ * Writes the buffer to the {@link ResultPartitionWriter} and removes the
+ * buffer from the serializer state.
*
- * <p> The buffer is cleared from the serializer state after a call to this method.
+ * Needs to be synchronized on the serializer!
*/
private void writeAndClearBuffer(
Buffer buffer,