You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/09 15:24:43 UTC

[1/4] flink git commit: [FLINK-5066] [network] Add EventSerializer.isEvent

Repository: flink
Updated Branches:
  refs/heads/master 384da0e99 -> 9cff8c92b


[FLINK-5066] [network] Add EventSerializer.isEvent

This allows PartitionRequestQueue to peak into event buffers instead of de-serializing the full event class.

This closes #2806.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94574652
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94574652
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94574652

Branch: refs/heads/master
Commit: 945746527c554bad5ba30fc953c33b35cf78b816
Parents: 4a6b0fa
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Dec 8 14:17:34 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:22:41 2017 +0100

----------------------------------------------------------------------
 .../api/serialization/EventSerializer.java      | 80 ++++++++++++++++++++
 .../io/network/netty/PartitionRequestQueue.java |  3 +-
 .../api/serialization/EventSerializerTest.java  | 80 ++++++++++++++++++++
 3 files changed, 162 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94574652/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 0bc3b28..4d9f431 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -91,6 +91,70 @@ public class EventSerializer {
 		}
 	}
 
+	/**
+	 * Identifies whether the given buffer encodes the given event.
+	 *
+	 * <p><strong>Pre-condition</strong>: This buffer must encode some event!</p>
+	 *
+	 * @param buffer the buffer to peak into
+	 * @param eventClass the expected class of the event type
+	 * @param classLoader the class loader to use for custom event classes
+	 * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
+	 * @throws IOException
+	 */
+	private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoader classLoader) throws IOException {
+		if (buffer.remaining() < 4) {
+			throw new IOException("Incomplete event");
+		}
+
+		final int bufferPos = buffer.position();
+		final ByteOrder bufferOrder = buffer.order();
+		buffer.order(ByteOrder.BIG_ENDIAN);
+
+		try {
+			int type = buffer.getInt();
+
+			switch (type) {
+				case END_OF_PARTITION_EVENT:
+					return eventClass.equals(EndOfPartitionEvent.class);
+				case CHECKPOINT_BARRIER_EVENT:
+					return eventClass.equals(CheckpointBarrier.class);
+				case END_OF_SUPERSTEP_EVENT:
+					return eventClass.equals(EndOfSuperstepEvent.class);
+				case CANCEL_CHECKPOINT_MARKER_EVENT:
+					return eventClass.equals(CancelCheckpointMarker.class);
+				case OTHER_EVENT:
+					try {
+						final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
+						final String className = deserializer.readUTF();
+
+						final Class<? extends AbstractEvent> clazz;
+						try {
+							clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
+						}
+						catch (ClassNotFoundException e) {
+							throw new IOException("Could not load event class '" + className + "'.", e);
+						}
+						catch (ClassCastException e) {
+							throw new IOException("The class '" + className + "' is not a valid subclass of '"
+								+ AbstractEvent.class.getName() + "'.", e);
+						}
+						return eventClass.equals(clazz);
+					}
+					catch (Exception e) {
+						throw new IOException("Error while deserializing or instantiating event.", e);
+					}
+				default:
+					throw new IOException("Corrupt byte stream for event");
+			}
+		}
+		finally {
+			buffer.order(bufferOrder);
+			// restore the original position in the buffer (recall: we only peak into it!)
+			buffer.position(bufferPos);
+		}
+	}
+
 	public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {
 		if (buffer.remaining() < 4) {
 			throw new IOException("Incomplete event");
@@ -170,4 +234,20 @@ public class EventSerializer {
 	public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
 		return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
 	}
+
+	/**
+	 * Identifies whether the given buffer encodes the given event.
+	 *
+	 * @param buffer the buffer to peak into
+	 * @param eventClass the expected class of the event type
+	 * @param classLoader the class loader to use for custom event classes
+	 * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
+	 * @throws IOException
+	 */
+	public static boolean isEvent(final Buffer buffer,
+		final Class<?> eventClass,
+		final ClassLoader classLoader) throws IOException {
+		return !buffer.isBuffer() &&
+			isEvent(buffer.getNioBuffer(), eventClass, classLoader);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94574652/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 297911a..5330dac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -218,7 +218,8 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	}
 
 	private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException {
-		return !buffer.isBuffer() && EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class;
+		return EventSerializer.isEvent(buffer, EndOfPartitionEvent.class,
+			getClass().getClassLoader());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/94574652/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index d47a0b5..271d0d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -23,13 +23,16 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -55,4 +58,81 @@ public class EventSerializerTest {
 			assertEquals(evt, deserialized);
 		}
 	}
+
+	/**
+	 * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)}
+	 * whether it peaks into the buffer only, i.e. after the call, the buffer
+	 * is still de-serializable.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testIsEventPeakOnly() throws Exception {
+		final Buffer serializedEvent =
+			EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+		try {
+			final ClassLoader cl = getClass().getClassLoader();
+			assertTrue(
+				EventSerializer
+					.isEvent(serializedEvent, EndOfPartitionEvent.class, cl));
+			EndOfPartitionEvent event = (EndOfPartitionEvent) EventSerializer
+				.fromBuffer(serializedEvent, cl);
+			assertEquals(EndOfPartitionEvent.INSTANCE, event);
+		} finally {
+			serializedEvent.recycle();
+		}
+	}
+
+	/**
+	 * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns
+	 * the correct answer for various encoded event buffers.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testIsEvent() throws Exception {
+		AbstractEvent[] events = {
+			EndOfPartitionEvent.INSTANCE,
+			EndOfSuperstepEvent.INSTANCE,
+			new CheckpointBarrier(1678L, 4623784L),
+			new TestTaskEvent(Math.random(), 12361231273L),
+			new CancelCheckpointMarker(287087987329842L)
+		};
+
+		for (AbstractEvent evt : events) {
+			for (AbstractEvent evt2 : events) {
+				if (evt == evt2) {
+					assertTrue(checkIsEvent(evt, evt2.getClass()));
+				} else {
+					assertFalse(checkIsEvent(evt, evt2.getClass()));
+				}
+			}
+		}
+	}
+
+	/**
+	 * Returns the result of
+	 * {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} on a buffer
+	 * that encodes the given <tt>event</tt>.
+	 *
+	 * @param event the event to encode
+	 * @param eventClass the event class to check against
+	 *
+	 * @return whether {@link EventSerializer#isEvent(ByteBuffer, Class, ClassLoader)}
+	 * 		thinks the encoded buffer matches the class
+	 * @throws IOException
+	 */
+	private final boolean checkIsEvent(final AbstractEvent event,
+		final Class<? extends AbstractEvent> eventClass) throws
+		IOException {
+		final Buffer serializedEvent =
+			EventSerializer.toBuffer(event);
+		try {
+			final ClassLoader cl = getClass().getClassLoader();
+			return EventSerializer
+				.isEvent(serializedEvent, eventClass, cl);
+		} finally {
+			serializedEvent.recycle();
+		}
+	}
 }


[4/4] flink git commit: [FLINK-5059] [network] Only serialise events once during RecordWriter#broadcastEvent

Posted by uc...@apache.org.
[FLINK-5059] [network] Only serialise events once during RecordWriter#broadcastEvent

This closes #2805.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9cff8c92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9cff8c92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9cff8c92

Branch: refs/heads/master
Commit: 9cff8c92bf4ec0d6ada7ef2595679d5bb6daf522
Parents: 9457465
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Nov 14 16:16:30 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:23:50 2017 +0100

----------------------------------------------------------------------
 .../io/network/api/writer/RecordWriter.java     | 44 +++++-----
 .../api/writer/ResultPartitionWriter.java       | 38 +++++----
 .../iterative/task/IterationHeadTask.java       |  7 +-
 .../task/IterationIntermediateTask.java         |  2 +-
 .../io/network/api/writer/RecordWriterTest.java | 64 ++++++++++----
 .../api/writer/ResultPartitionWriterTest.java   | 87 ++++++++++++++++++++
 .../streaming/runtime/tasks/StreamTask.java     |  3 +-
 .../runtime/tasks/StreamMockEnvironment.java    | 76 ++++++++++-------
 8 files changed, 229 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index dfa2f3d..9283b70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -131,35 +132,30 @@ public class RecordWriter<T extends IOReadableWritable> {
 	}
 
 	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
-		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
-			RecordSerializer<T> serializer = serializers[targetChannel];
-
-			synchronized (serializer) {
-				Buffer buffer = serializer.getCurrentBuffer();
-				if (buffer != null) {
-					writeAndClearBuffer(buffer, targetChannel, serializer);
-				} else if (serializer.hasData()) {
-					throw new IllegalStateException("No buffer, but serializer has buffered data.");
-				}
-
-				targetPartition.writeEvent(event, targetChannel);
-			}
-		}
-	}
+		final Buffer eventBuffer = EventSerializer.toBuffer(event);
+		try {
+			for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
+				RecordSerializer<T> serializer = serializers[targetChannel];
 
-	public void sendEndOfSuperstep() throws IOException, InterruptedException {
-		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
-			RecordSerializer<T> serializer = serializers[targetChannel];
+				synchronized (serializer) {
+					Buffer buffer = serializer.getCurrentBuffer();
+					if (buffer != null) {
+						writeAndClearBuffer(buffer, targetChannel, serializer);
+					} else if (serializer.hasData()) {
+						// sanity check
+						throw new IllegalStateException("No buffer, but serializer has buffered data.");
+					}
 
-			synchronized (serializer) {
-				Buffer buffer = serializer.getCurrentBuffer();
-				if (buffer != null) {
-					writeAndClearBuffer(buffer, targetChannel, serializer);
+					// retain the buffer so that it can be recycled by each channel of targetPartition
+					eventBuffer.retain();
+					targetPartition.writeBuffer(eventBuffer, targetChannel);
 				}
 			}
+		} finally {
+			// we do not need to further retain the eventBuffer
+			// (it will be recycled after the last channel stops using it)
+			eventBuffer.recycle();
 		}
-
-		targetPartition.writeEndOfSuperstep();
 	}
 
 	public void flush() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index cfab34d..4ca7616 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.TaskEventHandler;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -71,21 +68,26 @@ public class ResultPartitionWriter implements EventListener<TaskEvent> {
 		partition.add(buffer, targetChannel);
 	}
 
-	public void writeEvent(AbstractEvent event, int targetChannel) throws IOException {
-		partition.add(EventSerializer.toBuffer(event), targetChannel);
-	}
-
-	public void writeEventToAllChannels(AbstractEvent event) throws IOException {
-		for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
-			Buffer buffer = EventSerializer.toBuffer(event);
-			partition.add(buffer, i);
-		}
-	}
-
-	public void writeEndOfSuperstep() throws IOException {
-		for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
-			Buffer buffer = EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE);
-			partition.add(buffer, i);
+	/**
+	 * Writes the given buffer to all available target channels.
+	 *
+	 * The buffer is taken over and used for each of the channels.
+	 * It will be recycled afterwards.
+	 *
+	 * @param eventBuffer the buffer to write
+	 * @throws IOException
+	 */
+	public void writeBufferToAllChannels(final Buffer eventBuffer) throws IOException {
+		try {
+			for (int targetChannel = 0; targetChannel < partition.getNumberOfSubpartitions(); targetChannel++) {
+				// retain the buffer so that it can be recycled by each channel of targetPartition
+				eventBuffer.retain();
+				writeBuffer(eventBuffer, targetChannel);
+			}
+		} finally {
+			// we do not need to further retain the eventBuffer
+			// (it will be recycled after the last channel stops using it)
+			eventBuffer.recycle();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 2e3285c..575072d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.operators.Driver;
@@ -426,7 +428,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 		}
 
 		for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
-			eventualOutput.sendEndOfSuperstep();
+			eventualOutput.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
 		}
 	}
 
@@ -434,6 +436,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 		if (log.isInfoEnabled()) {
 			log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
 		}
-		this.toSync.writeEventToAllChannels(event);
+
+		this.toSync.writeBufferToAllChannels(EventSerializer.toBuffer(event));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
index 41f8e31..16a7008 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -124,7 +124,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
 
 	private void sendEndOfSuperstep() throws IOException, InterruptedException {
 		for (RecordWriter eventualOutput : this.eventualOutputs) {
-			eventualOutput.sendEndOfSuperstep();
+			eventualOutput.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 43a93c6..63175ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -26,6 +26,9 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,6 +36,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.types.IntValue;
@@ -42,6 +46,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -68,7 +73,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@PrepareForTest(ResultPartitionWriter.class)
+@PrepareForTest({ResultPartitionWriter.class, EventSerializer.class})
 @RunWith(PowerMockRunner.class)
 public class RecordWriterTest {
 
@@ -242,7 +247,7 @@ public class RecordWriterTest {
 			try {
 				// Verify that end of super step correctly clears the buffer.
 				recordWriter.emit(new IntValue(0));
-				recordWriter.sendEndOfSuperstep();
+				recordWriter.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
 
 				Assert.fail("Did not throw expected test Exception");
 			}
@@ -395,6 +400,39 @@ public class RecordWriterTest {
 		assertEquals(1, queues[3].size()); // 0 buffers + 1 event
 	}
 
+	/**
+	 * Tests that event buffers are properly recycled when broadcasting events
+	 * to multiple channels.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testBroadcastEventBufferReferenceCounting() throws Exception {
+		Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+		// Partial mocking of static method...
+		PowerMockito
+			.stub(PowerMockito.method(EventSerializer.class, "toBuffer"))
+			.toReturn(buffer);
+
+		@SuppressWarnings("unchecked")
+		ArrayDeque<BufferOrEvent>[] queues =
+			new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
+
+		ResultPartitionWriter partition =
+			createCollectingPartitionWriter(queues,
+				new TestInfiniteBufferProvider());
+		RecordWriter<?> writer = new RecordWriter<>(partition);
+
+		writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
+
+		// Verify added to all queues
+		assertEquals(1, queues[0].size());
+		assertEquals(1, queues[1].size());
+
+		assertTrue(buffer.isRecycled());
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// Helpers
 	// ---------------------------------------------------------------------------------------------
@@ -421,22 +459,20 @@ public class RecordWriterTest {
 			@Override
 			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 				Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];
-				Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
-				queues[targetChannel].add(new BufferOrEvent(buffer, targetChannel));
+				if (buffer.isBuffer()) {
+					Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
+					queues[targetChannel].add(new BufferOrEvent(buffer, targetChannel));
+				} else {
+					// is event:
+					AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+					buffer.recycle(); // the buffer is not needed anymore
+					Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
+					queues[targetChannel].add(new BufferOrEvent(event, targetChannel));
+				}
 				return null;
 			}
 		}).when(partitionWriter).writeBuffer(any(Buffer.class), anyInt());
 
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-				AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
-				Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
-				queues[targetChannel].add(new BufferOrEvent(event, targetChannel));
-				return null;
-			}
-		}).when(partitionWriter).writeEvent(any(AbstractEvent.class), anyInt());
-
 		return partitionWriter;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
new file mode 100644
index 0000000..104bc87
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayDeque;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+@PrepareForTest({ResultPartitionWriter.class})
+@RunWith(PowerMockRunner.class)
+public class ResultPartitionWriterTest {
+
+	// ---------------------------------------------------------------------------------------------
+	// Resource release tests
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Tests that event buffers are properly recycled when broadcasting events
+	 * to multiple channels.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testWriteBufferToAllChannelsReferenceCounting() throws Exception {
+		Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+		ResultPartition partition = new ResultPartition(
+			"TestTask",
+			mock(TaskActions.class),
+			new JobID(),
+			new ResultPartitionID(),
+			ResultPartitionType.PIPELINED,
+			2,
+			mock(ResultPartitionManager.class),
+			mock(ResultPartitionConsumableNotifier.class),
+			mock(IOManager.class),
+			false);
+		ResultPartitionWriter partitionWriter =
+			new ResultPartitionWriter(
+				partition);
+
+		partitionWriter.writeBufferToAllChannels(buffer);
+
+		// Verify added to all queues, i.e. two buffers in total
+		assertEquals(2, partition.getTotalNumberOfBuffers());
+		// release the buffers in the partition
+		partition.release();
+
+		assertTrue(buffer.isRecycled());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index bd9215a..675c606 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
@@ -591,7 +592,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				// yet be created
 				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
 				for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
-					output.writeEventToAllChannels(message);
+					output.writeBufferToAllChannels(EventSerializer.toBuffer(message));
 				}
 
 				return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/9cff8c92/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index b9a747c..58912ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -160,54 +161,27 @@ public class StreamMockEnvironment implements Environment {
 			final RecordDeserializer<DeserializationDelegate<T>> recordDeserializer = new AdaptiveSpanningRecordDeserializer<DeserializationDelegate<T>>();
 			final NonReusingDeserializationDelegate<T> delegate = new NonReusingDeserializationDelegate<T>(serializer);
 
-			// Add records from the buffer to the output list
+			// Add records and events from the buffer to the output list
 			doAnswer(new Answer<Void>() {
 
 				@Override
 				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 					Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];
-
-					recordDeserializer.setNextBuffer(buffer);
-
-					while (recordDeserializer.hasUnfinishedData()) {
-						RecordDeserializer.DeserializationResult result = recordDeserializer.getNextRecord(delegate);
-
-						if (result.isFullRecord()) {
-							outputList.add(delegate.getInstance());
-						}
-
-						if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
-							|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
-							break;
-						}
-					}
-
+					addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
 					return null;
 				}
 			}).when(mockWriter).writeBuffer(any(Buffer.class), anyInt());
 
-			// Add events to the output list
 			doAnswer(new Answer<Void>() {
 
 				@Override
 				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-					AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
-
-					outputList.add(event);
+					Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];
+					addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
 					return null;
 				}
-			}).when(mockWriter).writeEvent(any(AbstractEvent.class), anyInt());
-
-			doAnswer(new Answer<Void>() {
-
-				@Override
-				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-					AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
+			}).when(mockWriter).writeBufferToAllChannels(any(Buffer.class));
 
-					outputList.add(event);
-					return null;
-				}
-			}).when(mockWriter).writeEventToAllChannels(any(AbstractEvent.class));
 
 			outputs.add(mockWriter);
 		}
@@ -217,6 +191,44 @@ public class StreamMockEnvironment implements Environment {
 		}
 	}
 
+	/**
+	 * Adds the object behind the given <tt>buffer</tt> to the <tt>outputList</tt>.
+	 *
+	 * @param recordDeserializer de-serializer to use for the buffer
+	 * @param delegate de-serialization delegate to use for non-event buffers
+	 * @param buffer the buffer to add
+	 * @param outputList the output list to add the object to
+	 * @param <T> type of the objects behind the non-event buffers
+	 *
+	 * @throws java.io.IOException
+	 */
+	private <T> void addBufferToOutputList(
+		RecordDeserializer<DeserializationDelegate<T>> recordDeserializer,
+		NonReusingDeserializationDelegate<T> delegate, Buffer buffer,
+		final Queue<Object> outputList) throws java.io.IOException {
+		if (buffer.isBuffer()) {
+			recordDeserializer.setNextBuffer(buffer);
+
+			while (recordDeserializer.hasUnfinishedData()) {
+				RecordDeserializer.DeserializationResult result =
+					recordDeserializer.getNextRecord(delegate);
+
+				if (result.isFullRecord()) {
+					outputList.add(delegate.getInstance());
+				}
+
+				if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
+					|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+					break;
+				}
+			}
+		} else {
+			// is event
+			AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+			outputList.add(event);
+		}
+	}
+
 	@Override
 	public Configuration getTaskConfiguration() {
 		return this.taskConfiguration;


[2/4] flink git commit: [network] Clear serializer only once in RecordWriter#flush()

Posted by uc...@apache.org.
[network] Clear serializer only once in RecordWriter#flush()

This closes #2829.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a6b0fad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a6b0fad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a6b0fad

Branch: refs/heads/master
Commit: 4a6b0fad0909509cd1c37ed6e8d9dc7ec5d28d89
Parents: 161bab0
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 17 14:01:52 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:22:41 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/io/network/api/writer/RecordWriter.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4a6b0fad/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 87d34ff..dfa2f3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -171,7 +171,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 					Buffer buffer = serializer.getCurrentBuffer();
 
 					if (buffer != null) {
-						writeAndClearBuffer(buffer, targetChannel, serializer);
+						targetPartition.writeBuffer(buffer, targetChannel);
 					}
 				} finally {
 					serializer.clear();


[3/4] flink git commit: [javadocs, network] Add javadocs to SpanningRecordSerializer and RecordSerializer

Posted by uc...@apache.org.
[javadocs, network] Add javadocs to SpanningRecordSerializer and RecordSerializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/161bab04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/161bab04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/161bab04

Branch: refs/heads/master
Commit: 161bab04965b0110bfa80dcfcd1963f12b3cfda3
Parents: 384da0e
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 17 13:57:18 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:22:41 2017 +0100

----------------------------------------------------------------------
 .../api/serialization/RecordSerializer.java     | 72 ++++++++++++++++++--
 .../serialization/SpanningRecordSerializer.java | 16 +++++
 .../io/network/api/writer/RecordWriter.java     |  5 +-
 3 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index e8179dc..5fe56c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -43,30 +43,90 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 			this.isFullRecord = isFullRecord;
 			this.isFullBuffer = isFullBuffer;
 		}
-		
+
+		/**
+		 * Whether the full record was serialized and completely written to
+		 * a target buffer.
+		 *
+		 * @return <tt>true</tt> if the complete record was written
+		 */
 		public boolean isFullRecord() {
 			return this.isFullRecord;
 		}
-		
+
+		/**
+		 * Whether the target buffer is full after the serialization process.
+		 *
+		 * @return <tt>true</tt> if the target buffer is full
+		 */
 		public boolean isFullBuffer() {
 			return this.isFullBuffer;
 		}
 	}
-	
+
+	/**
+	 * Starts serializing and copying the given record to the target buffer
+	 * (if available).
+	 *
+	 * @param record the record to serialize
+	 * @return how much information was written to the target buffer and
+	 *         whether this buffer is full
+	 * @throws IOException
+	 */
 	SerializationResult addRecord(T record) throws IOException;
 
+	/**
+	 * Sets a (next) target buffer to use and continues writing remaining data
+	 * to it until it is full.
+	 *
+	 * @param buffer the new target buffer to use
+	 * @return how much information was written to the target buffer and
+	 *         whether this buffer is full
+	 * @throws IOException
+	 */
 	SerializationResult setNextBuffer(Buffer buffer) throws IOException;
 
+	/**
+	 * Retrieves the current target buffer and sets its size to the actual
+	 * number of written bytes.
+	 *
+	 * After calling this method, a new target buffer is required to continue
+	 * writing (see {@link #setNextBuffer(Buffer)}).
+	 *
+	 * @return the target buffer that was used
+	 */
 	Buffer getCurrentBuffer();
 
+	/**
+	 * Resets the target buffer to <tt>null</tt>.
+	 *
+	 * <p><strong>NOTE:</strong> After calling this method, <strong>a new target
+	 * buffer is required to continue writing</strong> (see
+	 * {@link #setNextBuffer(Buffer)}).</p>
+	 */
 	void clearCurrentBuffer();
-	
+
+	/**
+	 * Resets the target buffer to <tt>null</tt> and resets internal state set
+	 * up for the record to serialize.
+	 *
+	 * <p><strong>NOTE:</strong> After calling this method, a <strong>new record
+	 * and a new target buffer is required to start writing again</strong>
+	 * (see {@link #setNextBuffer(Buffer)}). If you want to continue
+	 * with the current record, use {@link #clearCurrentBuffer()} instead.</p>
+	 */
 	void clear();
-	
+
+	/**
+	 * Determines whether data is left, either in the current target buffer or
+	 * in any internal state set up for the record to serialize.
+	 *
+	 * @return <tt>true</tt> if some data is present
+	 */
 	boolean hasData();
 
 	/**
-	 * Insantiates all metrics.
+	 * Instantiates all metrics.
 	 *
 	 * @param metrics metric group
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index a8fe3fe..335d12e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -29,6 +29,13 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
+/**
+ * Record serializer which serializes the complete record to an intermediate
+ * data serialization buffer and copies this buffer to target buffers
+ * one-by-one using {@link #setNextBuffer(Buffer)}.
+ *
+ * @param <T>
+ */
 public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
 
 	/** Flag to enable/disable checks, if buffer not set/full or pending serialization */
@@ -65,6 +72,15 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		this.lengthBuffer.position(4);
 	}
 
+	/**
+	 * Serializes the complete record to an intermediate data serialization
+	 * buffer and starts copying it to the target buffer (if available).
+	 *
+	 * @param record the record to serialize
+	 * @return how much information was written to the target buffer and
+	 *         whether this buffer is full
+	 * @throws IOException
+	 */
 	@Override
 	public SerializationResult addRecord(T record) throws IOException {
 		if (CHECKED) {

http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 799187d..87d34ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -208,9 +208,10 @@ public class RecordWriter<T extends IOReadableWritable> {
 	}
 
 	/**
-	 * Writes the buffer to the {@link ResultPartitionWriter}.
+	 * Writes the buffer to the {@link ResultPartitionWriter} and removes the
+	 * buffer from the serializer state.
 	 *
-	 * <p> The buffer is cleared from the serializer state after a call to this method.
+	 * Needs to be synchronized on the serializer!
 	 */
 	private void writeAndClearBuffer(
 			Buffer buffer,