You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:24:13 UTC

[flink] branch master updated: [FLINK-9913][network] Serialize records only once for multi channel writes in RecordWriter (#6417)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 914dffb  [FLINK-9913][network] Serialize records only once for multi channel writes in RecordWriter (#6417)
914dffb is described below

commit 914dffb8cec9a1c4b3012817109f767c1e87aa51
Author: zhijiang <wa...@aliyun.com>
AuthorDate: Fri Sep 21 19:24:04 2018 +0800

    [FLINK-9913][network] Serialize records only once for multi channel writes in RecordWriter (#6417)
    
    This commit improves the output serialization, to serialize records only once for multi target channels in RecordWriter, rather than serializing record as many times as the number of selected channels.
---
 .../api/serialization/RecordSerializer.java        |  24 +++--
 .../serialization/SpanningRecordSerializer.java    |  64 +++++------
 .../io/network/api/writer/RecordWriter.java        | 117 ++++++++++++---------
 .../SpanningRecordSerializationTest.java           |  41 ++++----
 .../SpanningRecordSerializerTest.java              |  59 ++++++-----
 .../io/network/api/writer/RecordWriterTest.java    | 104 ++++++++++++++++++
 .../IteratorWrappingTestSingleInputGate.java       |   5 +-
 .../io/network/util/DeserializationUtils.java      |  61 +++++++++++
 .../consumer/StreamTestSingleInputGate.java        |   6 +-
 9 files changed, 328 insertions(+), 153 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 25d2927..6eebbbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -66,29 +66,33 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	}
 
 	/**
-	 * Starts serializing and copying the given record to the target buffer
-	 * (if available).
+	 * Starts serializing the given record to an intermediate data buffer.
 	 *
 	 * @param record the record to serialize
-	 * @return how much information was written to the target buffer and
-	 *         whether this buffer is full
 	 */
-	SerializationResult addRecord(T record) throws IOException;
+	void serializeRecord(T record) throws IOException;
 
 	/**
-	 * Sets a (next) target buffer to use and continues writing remaining data
-	 * to it until it is full.
+	 * Copies the intermediate data serialization buffer to the given target buffer.
 	 *
 	 * @param bufferBuilder the new target buffer to use
 	 * @return how much information was written to the target buffer and
 	 *         whether this buffer is full
 	 */
-	SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException;
+	SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
+
+	/**
+	 * Checks to decrease the size of intermediate data serialization buffer after finishing the
+	 * whole serialization process including {@link #serializeRecord(IOReadableWritable)} and
+	 * {@link #copyToBufferBuilder(BufferBuilder)}.
+	 */
+	void prune();
 
 	/**
-	 * Clear and release internal state.
+	 * Supports copying an intermediate data serialization buffer to multiple target buffers
+	 * by resetting its initial position before each copying.
 	 */
-	void clear();
+	void reset();
 
 	/**
 	 * @return <tt>true</tt> if has some serialized data pending copying to the result {@link BufferBuilder}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index c4ab53f..ba2ed01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -20,11 +20,8 @@ package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -32,7 +29,7 @@ import java.nio.ByteOrder;
 /**
  * Record serializer which serializes the complete record to an intermediate
  * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link #continueWritingWithNextBufferBuilder(BufferBuilder)}.
+ * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}.
  *
  * @param <T> The type of the records that are serialized.
  */
@@ -50,10 +47,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	/** Intermediate buffer for length serialization. */
 	private final ByteBuffer lengthBuffer;
 
-	/** Current target {@link Buffer} of the serializer. */
-	@Nullable
-	private BufferBuilder targetBuffer;
-
 	public SpanningRecordSerializer() {
 		serializationBuffer = new DataOutputSerializer(128);
 
@@ -66,15 +59,12 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	}
 
 	/**
-	 * Serializes the complete record to an intermediate data serialization
-	 * buffer and starts copying it to the target buffer (if available).
+	 * Serializes the complete record to an intermediate data serialization buffer.
 	 *
 	 * @param record the record to serialize
-	 * @return how much information was written to the target buffer and
-	 *         whether this buffer is full
 	 */
 	@Override
-	public SerializationResult addRecord(T record) throws IOException {
+	public void serializeRecord(T record) throws IOException {
 		if (CHECKED) {
 			if (dataBuffer.hasRemaining()) {
 				throw new IllegalStateException("Pending serialization of previous record.");
@@ -91,21 +81,17 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		lengthBuffer.putInt(0, len);
 
 		dataBuffer = serializationBuffer.wrapAsByteBuffer();
-
-		// Copy from intermediate buffers to current target memory segment
-		if (targetBuffer != null) {
-			targetBuffer.append(lengthBuffer);
-			targetBuffer.append(dataBuffer);
-			targetBuffer.commit();
-		}
-
-		return getSerializationResult();
 	}
 
+	/**
+	 * Copies an intermediate data serialization buffer into the target BufferBuilder.
+	 *
+	 * @param targetBuffer the target BufferBuilder to copy to
+	 * @return how much information was written to the target buffer and
+	 *         whether this buffer is full
+	 */
 	@Override
-	public SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder buffer) throws IOException {
-		targetBuffer = buffer;
-
+	public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) {
 		boolean mustCommit = false;
 		if (lengthBuffer.hasRemaining()) {
 			targetBuffer.append(lengthBuffer);
@@ -121,30 +107,28 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 			targetBuffer.commit();
 		}
 
-		SerializationResult result = getSerializationResult();
-
-		// make sure we don't hold onto the large buffers for too long
-		if (result.isFullRecord()) {
-			serializationBuffer.clear();
-			serializationBuffer.pruneBuffer();
-			dataBuffer = serializationBuffer.wrapAsByteBuffer();
-		}
-
-		return result;
+		return getSerializationResult(targetBuffer);
 	}
 
-	private SerializationResult getSerializationResult() {
+	private SerializationResult getSerializationResult(BufferBuilder targetBuffer) {
 		if (dataBuffer.hasRemaining() || lengthBuffer.hasRemaining()) {
 			return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
 		}
 		return !targetBuffer.isFull()
-				? SerializationResult.FULL_RECORD
-				: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
+			? SerializationResult.FULL_RECORD
+			: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
 	}
 
 	@Override
-	public void clear() {
-		targetBuffer = null;
+	public void reset() {
+		dataBuffer.position(0);
+		lengthBuffer.position(0);
+	}
+
+	@Override
+	public void prune() {
+		serializationBuffer.pruneBuffer();
+		dataBuffer = serializationBuffer.wrapAsByteBuffer();
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 970795c..84d8183 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -58,10 +58,9 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	private final int numChannels;
 
-	/**
-	 * {@link RecordSerializer} per outgoing channel.
-	 */
-	private final RecordSerializer<T>[] serializers;
+	private final int[] broadcastChannels;
+
+	private final RecordSerializer<T> serializer;
 
 	private final Optional<BufferBuilder>[] bufferBuilders;
 
@@ -89,23 +88,17 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 		this.numChannels = writer.getNumberOfSubpartitions();
 
-		/*
-		 * The runtime exposes a channel abstraction for the produced results
-		 * (see {@link ChannelSelector}). Every channel has an independent
-		 * serializer.
-		 */
-		this.serializers = new SpanningRecordSerializer[numChannels];
+		this.serializer = new SpanningRecordSerializer<T>();
 		this.bufferBuilders = new Optional[numChannels];
+		this.broadcastChannels = new int[numChannels];
 		for (int i = 0; i < numChannels; i++) {
-			serializers[i] = new SpanningRecordSerializer<T>();
+			broadcastChannels[i] = i;
 			bufferBuilders[i] = Optional.empty();
 		}
 	}
 
 	public void emit(T record) throws IOException, InterruptedException {
-		for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
-			sendToTarget(record, targetChannel);
-		}
+		emit(record, channelSelector.selectChannels(record, numChannels));
 	}
 
 	/**
@@ -113,53 +106,78 @@ public class RecordWriter<T extends IOReadableWritable> {
 	 * the {@link ChannelSelector}.
 	 */
 	public void broadcastEmit(T record) throws IOException, InterruptedException {
-		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
-			sendToTarget(record, targetChannel);
-		}
+		emit(record, broadcastChannels);
 	}
 
 	/**
 	 * This is used to send LatencyMarks to a random target channel.
 	 */
 	public void randomEmit(T record) throws IOException, InterruptedException {
-		sendToTarget(record, rng.nextInt(numChannels));
+		serializer.serializeRecord(record);
+
+		if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+			serializer.prune();
+		}
 	}
 
-	private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
-		RecordSerializer<T> serializer = serializers[targetChannel];
+	private void emit(T record, int[] targetChannels) throws IOException, InterruptedException {
+		serializer.serializeRecord(record);
+
+		boolean pruneAfterCopying = false;
+		for (int channel : targetChannels) {
+			if (copyFromSerializerToTargetChannel(channel)) {
+				pruneAfterCopying = true;
+			}
+		}
 
-		SerializationResult result = serializer.addRecord(record);
+		// Make sure we don't hold onto the large intermediate serialization buffer for too long
+		if (pruneAfterCopying) {
+			serializer.prune();
+		}
+	}
 
+	/**
+	 * @param targetChannel
+	 * @return <tt>true</tt> if the intermediate serialization buffer should be pruned
+	 */
+	private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
+		// We should reset the initial position of the intermediate serialization buffer before
+		// copying, so the serialization results can be copied to multiple target buffers.
+		serializer.reset();
+
+		boolean pruneTriggered = false;
+		BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+		SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
 		while (result.isFullBuffer()) {
-			if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
-				// If this was a full record, we are done. Not breaking
-				// out of the loop at this point will lead to another
-				// buffer request before breaking out (that would not be
-				// a problem per se, but it can lead to stalls in the
-				// pipeline).
-				if (result.isFullRecord()) {
-					break;
-				}
+			numBytesOut.inc(bufferBuilder.finish());
+			numBuffersOut.inc();
+
+			// If this was a full record, we are done. Not breaking out of the loop at this point
+			// will lead to another buffer request before breaking out (that would not be a
+			// problem per se, but it can lead to stalls in the pipeline).
+			if (result.isFullRecord()) {
+				pruneTriggered = true;
+				bufferBuilders[targetChannel] = Optional.empty();
+				break;
 			}
-			BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
 
-			result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
+			bufferBuilder = requestNewBufferBuilder(targetChannel);
+			result = serializer.copyToBufferBuilder(bufferBuilder);
 		}
 		checkState(!serializer.hasSerializedData(), "All data should be written at once");
 
 		if (flushAlways) {
 			targetPartition.flush(targetChannel);
 		}
+		return pruneTriggered;
 	}
 
 	public void broadcastEvent(AbstractEvent event) throws IOException {
 		try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) {
 			for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
-				RecordSerializer<T> serializer = serializers[targetChannel];
-
-				tryFinishCurrentBufferBuilder(targetChannel, serializer);
+				tryFinishCurrentBufferBuilder(targetChannel);
 
-				// retain the buffer so that it can be recycled by each channel of targetPartition
+				// Retain the buffer so that it can be recycled by each channel of targetPartition
 				targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
 			}
 
@@ -175,9 +193,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	public void clearBuffers() {
 		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
-			RecordSerializer<?> serializer = serializers[targetChannel];
 			closeBufferBuilder(targetChannel);
-			serializer.clear();
 		}
 	}
 
@@ -191,25 +207,32 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	/**
 	 * Marks the current {@link BufferBuilder} as finished and clears the state for next one.
-	 *
-	 * @return true if some data were written
 	 */
-	private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerializer<T> serializer) {
-
+	private void tryFinishCurrentBufferBuilder(int targetChannel) {
 		if (!bufferBuilders[targetChannel].isPresent()) {
-			return false;
+			return;
 		}
 		BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get();
 		bufferBuilders[targetChannel] = Optional.empty();
-
 		numBytesOut.inc(bufferBuilder.finish());
 		numBuffersOut.inc();
-		serializer.clear();
-		return true;
+	}
+
+	/**
+	 * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need
+	 * request a new one for this target channel.
+	 */
+	private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
+		if (bufferBuilders[targetChannel].isPresent()) {
+			return bufferBuilders[targetChannel].get();
+		} else {
+			return requestNewBufferBuilder(targetChannel);
+		}
 	}
 
 	private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		checkState(!bufferBuilders[targetChannel].isPresent());
+		checkState(!bufferBuilders[targetChannel].isPresent() || bufferBuilders[targetChannel].get().isFinished());
+
 		BufferBuilder bufferBuilder = targetPartition.getBufferProvider().requestBufferBuilderBlocking();
 		bufferBuilders[targetChannel] = Optional.of(bufferBuilder);
 		targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 2e1063f..a17008a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
+import org.apache.flink.runtime.io.network.util.DeserializationUtils;
 import org.apache.flink.testutils.serialization.types.IntType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
@@ -134,7 +135,7 @@ public class SpanningRecordSerializationTest extends TestLogger {
 
 		// -------------------------------------------------------------------------------------------------------------
 
-		BufferConsumerAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize);
+		BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize);
 
 		int numRecords = 0;
 		for (SerializationTestType record : records) {
@@ -144,27 +145,16 @@ public class SpanningRecordSerializationTest extends TestLogger {
 			numRecords++;
 
 			// serialize record
-			if (serializer.addRecord(record).isFullBuffer()) {
+			serializer.serializeRecord(record);
+			if (serializer.copyToBufferBuilder(serializationResult.getBufferBuilder()).isFullBuffer()) {
 				// buffer is full => start deserializing
 				deserializer.setNextBuffer(serializationResult.buildBuffer());
 
-				while (!serializedRecords.isEmpty()) {
-					SerializationTestType expected = serializedRecords.poll();
-					SerializationTestType actual = expected.getClass().newInstance();
-
-					if (deserializer.getNextRecord(actual).isFullRecord()) {
-						Assert.assertEquals(expected, actual);
-						numRecords--;
-					} else {
-						serializedRecords.addFirst(expected);
-						break;
-					}
-				}
+				numRecords -= DeserializationUtils.deserializeRecords(serializedRecords, deserializer);
 
 				// move buffers as long as necessary (for long records)
 				while ((serializationResult = setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
 					deserializer.setNextBuffer(serializationResult.buildBuffer());
-					serializer.clear();
 				}
 			}
 		}
@@ -189,7 +179,7 @@ public class SpanningRecordSerializationTest extends TestLogger {
 		Assert.assertFalse(deserializer.hasUnfinishedData());
 	}
 
-	private static BufferConsumerAndSerializerResult setNextBufferForSerializer(
+	private static BufferAndSerializerResult setNextBufferForSerializer(
 			RecordSerializer<SerializationTestType> serializer,
 			int segmentSize) throws IOException {
 		// create a bufferBuilder with some random starting offset to properly test handling buffer slices in the
@@ -199,21 +189,30 @@ public class SpanningRecordSerializationTest extends TestLogger {
 		BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
 		bufferConsumer.build().recycleBuffer();
 
-		serializer.clear();
-		return new BufferConsumerAndSerializerResult(
+		return new BufferAndSerializerResult(
+			bufferBuilder,
 			bufferConsumer,
-			serializer.continueWritingWithNextBufferBuilder(bufferBuilder));
+			serializer.copyToBufferBuilder(bufferBuilder));
 	}
 
-	private static class BufferConsumerAndSerializerResult {
+	private static class BufferAndSerializerResult {
+		private final BufferBuilder bufferBuilder;
 		private final BufferConsumer bufferConsumer;
 		private final RecordSerializer.SerializationResult serializationResult;
 
-		public BufferConsumerAndSerializerResult(BufferConsumer bufferConsumer, RecordSerializer.SerializationResult serializationResult) {
+		public BufferAndSerializerResult(
+				BufferBuilder bufferBuilder,
+				BufferConsumer bufferConsumer,
+				RecordSerializer.SerializationResult serializationResult) {
+			this.bufferBuilder = bufferBuilder;
 			this.bufferConsumer = bufferConsumer;
 			this.serializationResult = serializationResult;
 		}
 
+		public BufferBuilder getBufferBuilder() {
+			return bufferBuilder;
+		}
+
 		public Buffer buildBuffer() {
 			return buildSingleBuffer(bufferConsumer);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index c39b54a..e5f5dfc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
@@ -40,25 +41,25 @@ public class SpanningRecordSerializerTest {
 
 	@Test
 	public void testHasSerializedData() throws IOException {
-		final int segmentSize = 16;
-
 		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
 
 		Assert.assertFalse(serializer.hasSerializedData());
 
-		serializer.addRecord(randomIntRecord);
+		serializer.serializeRecord(randomIntRecord);
 		Assert.assertTrue(serializer.hasSerializedData());
 
-		serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+		final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
+		serializer.copyToBufferBuilder(bufferBuilder1);
 		Assert.assertFalse(serializer.hasSerializedData());
 
-		serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8));
-
-		serializer.addRecord(randomIntRecord);
+		final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
+		serializer.reset();
+		serializer.copyToBufferBuilder(bufferBuilder2);
 		Assert.assertFalse(serializer.hasSerializedData());
 
-		serializer.addRecord(randomIntRecord);
+		serializer.reset();
+		serializer.copyToBufferBuilder(bufferBuilder2);
 		// Buffer builder full!
 		Assert.assertTrue(serializer.hasSerializedData());
 	}
@@ -68,15 +69,10 @@ public class SpanningRecordSerializerTest {
 		final int segmentSize = 11;
 
 		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
+		final BufferBuilder bufferBuilder1 = createBufferBuilder(segmentSize);
 
-		try {
-			Assert.assertEquals(
-				RecordSerializer.SerializationResult.FULL_RECORD,
-				serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)));
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
+		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD,
+			serializer.copyToBufferBuilder(bufferBuilder1));
 
 		SerializationTestType emptyRecord = new SerializationTestType() {
 			@Override
@@ -106,17 +102,19 @@ public class SpanningRecordSerializerTest {
 			}
 		};
 
-		RecordSerializer.SerializationResult result = serializer.addRecord(emptyRecord);
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+		serializer.serializeRecord(emptyRecord);
+		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1));
 
-		result = serializer.addRecord(emptyRecord);
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+		serializer.reset();
+		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1));
 
-		result = serializer.addRecord(emptyRecord);
-		Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
+		serializer.reset();
+		Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
+			serializer.copyToBufferBuilder(bufferBuilder1));
 
-		result = serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+		final BufferBuilder bufferBuilder2 = createBufferBuilder(segmentSize);
+		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD,
+			serializer.copyToBufferBuilder(bufferBuilder2));
 	}
 
 	@Test
@@ -169,26 +167,29 @@ public class SpanningRecordSerializerTest {
 
 		// -------------------------------------------------------------------------------------------------------------
 
-		serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
-
+		BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
 		int numBytes = 0;
 		for (SerializationTestType record : records) {
-			RecordSerializer.SerializationResult result = serializer.addRecord(record);
+			serializer.serializeRecord(record);
+			RecordSerializer.SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
 			numBytes += record.length() + serializationOverhead;
 
 			if (numBytes < segmentSize) {
 				Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
 			} else if (numBytes == segmentSize) {
 				Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result);
-				serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+				bufferBuilder = createBufferBuilder(segmentSize);
 				numBytes = 0;
 			} else {
 				Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
 
 				while (result.isFullBuffer()) {
 					numBytes -= segmentSize;
-					result = serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+					bufferBuilder = createBufferBuilder(segmentSize);
+					result = serializer.copyToBufferBuilder(bufferBuilder);
 				}
+
+				Assert.assertTrue(result.isFullRecord());
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 0b0a236..ed9f4cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -28,7 +28,9 @@ import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -36,11 +38,18 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.util.DeserializationUtils;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.testutils.serialization.types.Util;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.XORShiftRandom;
 
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -70,6 +79,9 @@ import static org.mockito.Mockito.when;
  */
 public class RecordWriterTest {
 
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
 	// ---------------------------------------------------------------------------------------------
 	// Resource release tests
 	// ---------------------------------------------------------------------------------------------
@@ -377,6 +389,77 @@ public class RecordWriterTest {
 		assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex());
 	}
 
+	/**
+	 * Tests that records are broadcast via {@link ChannelSelector} and
+	 * {@link RecordWriter#emit(IOReadableWritable)}.
+	 */
+	@Test
+	public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+		emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+	}
+
+	/**
+	 * Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}.
+	 */
+	@Test
+	public void testBroadcastEmitRecord() throws Exception {
+		emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+	}
+
+	/**
+	 * The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same,
+	 * that is all the target channels can receive the whole outputs.
+	 *
+	 * @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not
+	 */
+	private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception {
+		final int numChannels = 4;
+		final int bufferSize = 32;
+		final int numValues = 8;
+		final int serializationLength = 4;
+
+		@SuppressWarnings("unchecked")
+		final Queue<BufferConsumer>[] queues = new Queue[numChannels];
+		for (int i = 0; i < numChannels; i++) {
+			queues[i] = new ArrayDeque<>();
+		}
+
+		final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+		final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
+		final RecordWriter<SerializationTestType> writer = isBroadcastEmit ?
+			new RecordWriter<>(partitionWriter) :
+			new RecordWriter<>(partitionWriter, new Broadcast<>());
+		final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
+			new String[]{ tempFolder.getRoot().getAbsolutePath() });
+
+		final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
+		final Iterable<SerializationTestType> records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
+		for (SerializationTestType record : records) {
+			serializedRecords.add(record);
+
+			if (isBroadcastEmit) {
+				writer.broadcastEmit(record);
+			} else {
+				writer.emit(record);
+			}
+		}
+
+		final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength));
+		for (int i = 0; i < numChannels; i++) {
+			assertEquals(requiredBuffers, queues[i].size());
+
+			final ArrayDeque<SerializationTestType> expectedRecords = serializedRecords.clone();
+			int assertRecords = 0;
+			for (int j = 0; j < requiredBuffers; j++) {
+				Buffer buffer = buildSingleBuffer(queues[i].remove());
+				deserializer.setNextBuffer(buffer);
+
+				assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
+			}
+			Assert.assertEquals(numValues, assertRecords);
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// Helpers
 	// ---------------------------------------------------------------------------------------------
@@ -524,6 +607,27 @@ public class RecordWriterTest {
 		}
 	}
 
+	/**
+	 * Broadcast channel selector that selects all the output channels.
+	 */
+	private static class Broadcast<T extends IOReadableWritable> implements ChannelSelector<T> {
+
+		private int[] returnChannel;
+
+		@Override
+		public int[] selectChannels(final T record, final int numberOfOutputChannels) {
+			if (returnChannel != null && returnChannel.length == numberOfOutputChannels) {
+				return returnChannel;
+			} else {
+				this.returnChannel = new int[numberOfOutputChannels];
+				for (int i = 0; i < numberOfOutputChannels; i++) {
+					returnChannel[i] = i;
+				}
+				return returnChannel;
+			}
+		}
+	}
+
 	private static class TrackingBufferRecycler implements BufferRecycler {
 		private final ArrayList<MemorySegment> recycledMemorySegments = new ArrayList<>();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index a67df0b..d691f3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -75,10 +75,9 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 			@Override
 			public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
 				if (hasData) {
-					serializer.clear();
+					serializer.serializeRecord(reuse);
 					BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
-					serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
-					serializer.addRecord(reuse);
+					serializer.copyToBufferBuilder(bufferBuilder);
 
 					hasData = inputIterator.next(reuse) != null;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/DeserializationUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/DeserializationUtils.java
new file mode 100644
index 0000000..da10323
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/DeserializationUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+
+import org.junit.Assert;
+
+import java.util.ArrayDeque;
+
+/**
+ * Utility class to help deserialization for testing.
+ */
+public final class DeserializationUtils {
+
+	/**
+	 * Iterates over the provided records to deserialize, verifies the results and stats
+	 * the number of full records.
+	 *
+	 * @param records records to be deserialized
+	 * @param deserializer the record deserializer
+	 * @return the number of full deserialized records
+	 */
+	public static int deserializeRecords(
+			ArrayDeque<SerializationTestType> records,
+			RecordDeserializer<SerializationTestType> deserializer) throws Exception {
+		int deserializedRecords = 0;
+
+		while (!records.isEmpty()) {
+			SerializationTestType expected = records.poll();
+			SerializationTestType actual = expected.getClass().newInstance();
+
+			if (deserializer.getNextRecord(actual).isFullRecord()) {
+				Assert.assertEquals(expected, actual);
+				deserializedRecords++;
+			} else {
+				records.addFirst(expected);
+				break;
+			}
+		}
+
+		return deserializedRecords;
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index ea38382..dbb81ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -105,10 +105,10 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 				} else if (input != null && input.isStreamRecord()) {
 					Object inputElement = input.getStreamRecord();
 
-					BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
-					recordSerializer.continueWritingWithNextBufferBuilder(bufferBuilder);
 					delegate.setInstance(inputElement);
-					recordSerializer.addRecord(delegate);
+					recordSerializer.serializeRecord(delegate);
+					BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
+					recordSerializer.copyToBufferBuilder(bufferBuilder);
 					bufferBuilder.finish();
 
 					// Call getCurrentBuffer to ensure size is set