You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/01/23 17:38:00 UTC

[flink] branch master updated: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter (#7438)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b12d392  [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter (#7438)
b12d392 is described below

commit b12d392e7c1e7e2baca995d475fa259efc6af246
Author: zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jan 24 01:37:51 2019 +0800

    [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter (#7438)
    
    [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter
---
 .../io/network/api/writer/RecordWriter.java        | 132 ++++++++++++++-
 .../streaming/runtime/io/RecordWriterOutput.java   |   8 +-
 .../streaming/runtime/io/StreamRecordWriter.java   | 181 ---------------------
 .../streaming/runtime/tasks/OperatorChain.java     |  10 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  28 ++--
 .../io/benchmark/LongRecordWriterThread.java       |   5 +-
 .../StreamNetworkBenchmarkEnvironment.java         |   6 +-
 .../StreamNetworkPointToPointBenchmark.java        |   2 +-
 .../operators/StreamOperatorChainingTest.java      |   2 +-
 9 files changed, 155 insertions(+), 219 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 6a691c3..d1f8d3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -30,11 +30,15 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.XORShiftRandom;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Optional;
 import java.util.Random;
 
 import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -52,7 +56,9 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public class RecordWriter<T extends IOReadableWritable> {
 
-	protected final ResultPartitionWriter targetPartition;
+	private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
+
+	private final ResultPartitionWriter targetPartition;
 
 	private final ChannelSelector<T> channelSelector;
 
@@ -66,23 +72,35 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	private final Random rng = new XORShiftRandom();
 
-	private final boolean flushAlways;
-
 	private Counter numBytesOut = new SimpleCounter();
 
 	private Counter numBuffersOut = new SimpleCounter();
 
+	private final boolean flushAlways;
+
+	/** Default name for teh output flush thread, if no name with a task reference is given. */
+	private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+
+	/** The thread that periodically flushes the output, to give an upper latency bound. */
+	private final Optional<OutputFlusher> outputFlusher;
+
+	/** To avoid synchronization overhead on the critical path, best-effort error tracking is enough here.*/
+	private Throwable flusherException;
+
 	public RecordWriter(ResultPartitionWriter writer) {
 		this(writer, new RoundRobinChannelSelector<T>());
 	}
 
 	@SuppressWarnings("unchecked")
 	public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
-		this(writer, channelSelector, false);
+		this(writer, channelSelector, -1, null);
 	}
 
-	public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, boolean flushAlways) {
-		this.flushAlways = flushAlways;
+	public RecordWriter(
+			ResultPartitionWriter writer,
+			ChannelSelector<T> channelSelector,
+			long timeout,
+			String taskName) {
 		this.targetPartition = writer;
 		this.channelSelector = channelSelector;
 		this.numberOfChannels = writer.getNumberOfSubpartitions();
@@ -95,9 +113,23 @@ public class RecordWriter<T extends IOReadableWritable> {
 			broadcastChannels[i] = i;
 			bufferBuilders[i] = Optional.empty();
 		}
+
+		checkArgument(timeout >= -1);
+		this.flushAlways = (timeout == 0);
+		if (timeout == -1 || timeout == 0) {
+			outputFlusher = Optional.empty();
+		} else {
+			String threadName = taskName == null ?
+				DEFAULT_OUTPUT_FLUSH_THREAD_NAME :
+				DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " for " + taskName;
+
+			outputFlusher = Optional.of(new OutputFlusher(threadName, timeout));
+			outputFlusher.get().start();
+		}
 	}
 
 	public void emit(T record) throws IOException, InterruptedException {
+		checkErroneous();
 		emit(record, channelSelector.selectChannels(record));
 	}
 
@@ -106,6 +138,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 	 * the {@link ChannelSelector}.
 	 */
 	public void broadcastEmit(T record) throws IOException, InterruptedException {
+		checkErroneous();
 		emit(record, broadcastChannels);
 	}
 
@@ -113,8 +146,8 @@ public class RecordWriter<T extends IOReadableWritable> {
 	 * This is used to send LatencyMarks to a random target channel.
 	 */
 	public void randomEmit(T record) throws IOException, InterruptedException {
+		checkErroneous();
 		serializer.serializeRecord(record);
-
 		if (copyFromSerializerToTargetChannel(rng.nextInt(numberOfChannels))) {
 			serializer.prune();
 		}
@@ -245,4 +278,89 @@ public class RecordWriter<T extends IOReadableWritable> {
 			bufferBuilders[targetChannel] = Optional.empty();
 		}
 	}
+
+	/**
+	 * Closes the writer. This stops the flushing thread (if there is one).
+	 */
+	public void close() {
+		clearBuffers();
+		// make sure we terminate the thread in any case
+		if (outputFlusher.isPresent()) {
+			outputFlusher.get().terminate();
+			try {
+				outputFlusher.get().join();
+			} catch (InterruptedException e) {
+				// ignore on close
+				// restore interrupt flag to fast exit further blocking calls
+				Thread.currentThread().interrupt();
+			}
+		}
+	}
+
+	/**
+	 * Notifies the writer that the output flusher thread encountered an exception.
+	 *
+	 * @param t The exception to report.
+	 */
+	private void notifyFlusherException(Throwable t) {
+		if (flusherException == null) {
+			LOG.error("An exception happened while flushing the outputs", t);
+			flusherException = t;
+		}
+	}
+
+	private void checkErroneous() throws IOException {
+		if (flusherException != null) {
+			throw new IOException("An exception happened while flushing the outputs", flusherException);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+
+	/**
+	 * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
+	 *
+	 * <p>The thread is daemonic, because it is only a utility thread.
+	 */
+	private class OutputFlusher extends Thread {
+
+		private final long timeout;
+
+		private volatile boolean running = true;
+
+		OutputFlusher(String name, long timeout) {
+			super(name);
+			setDaemon(true);
+			this.timeout = timeout;
+		}
+
+		public void terminate() {
+			running = false;
+			interrupt();
+		}
+
+		@Override
+		public void run() {
+			try {
+				while (running) {
+					try {
+						Thread.sleep(timeout);
+					} catch (InterruptedException e) {
+						// propagate this if we are still running, because it should not happen
+						// in that case
+						if (running) {
+							throw new Exception(e);
+						}
+					}
+
+					// any errors here should let the thread come to a halt and be
+					// recognized by the writer
+					flushAll();
+				}
+			} catch (Throwable t) {
+				notifyFlusherException(t);
+			}
+		}
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index d2b4d1f..31fbcef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -45,7 +45,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
 
-	private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
+	private RecordWriter<SerializationDelegate<StreamElement>> recordWriter;
 
 	private SerializationDelegate<StreamElement> serializationDelegate;
 
@@ -57,7 +57,7 @@ public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExpo
 
 	@SuppressWarnings("unchecked")
 	public RecordWriterOutput(
-			StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
+			RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
 			TypeSerializer<OUT> outSerializer,
 			OutputTag outputTag,
 			StreamStatusProvider streamStatusProvider) {
@@ -66,8 +66,8 @@ public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExpo
 		this.outputTag = outputTag;
 		// generic hack: cast the writer to generic Object type so we can use it
 		// with multiplexed records and watermarks
-		this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>)
-				(StreamRecordWriter<?>) recordWriter;
+		this.recordWriter = (RecordWriter<SerializationDelegate<StreamElement>>)
+				(RecordWriter<?>) recordWriter;
 
 		TypeSerializer<StreamElement> outRecordSerializer =
 				new StreamElementSerializer<>(outSerializer);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
deleted file mode 100644
index acda489..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * This record writer keeps data in buffers at most for a certain timeout. It spawns a separate thread
- * that flushes the outputs in a defined interval, to make sure data does not linger in the buffers for too long.
- *
- * @param <T> The type of elements written.
- */
-@Internal
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
-
-	/** Default name for the output flush thread, if no name with a task reference is given. */
-	private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
-
-
-	/** The thread that periodically flushes the output, to give an upper latency bound. */
-	private final OutputFlusher outputFlusher;
-
-	/** The exception encountered in the flushing thread. */
-	private Throwable flusherException;
-
-	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout) {
-		this(writer, channelSelector, timeout, null);
-	}
-
-	public StreamRecordWriter(
-			ResultPartitionWriter writer,
-			ChannelSelector<T> channelSelector,
-			long timeout,
-			String taskName) {
-		super(writer, channelSelector, timeout == 0);
-
-		checkArgument(timeout >= -1);
-
-		if (timeout == -1) {
-			outputFlusher = null;
-		}
-		else if (timeout == 0) {
-			outputFlusher = null;
-		}
-		else {
-			String threadName = taskName == null ?
-				DEFAULT_OUTPUT_FLUSH_THREAD_NAME :
-				DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " for " + taskName;
-
-			outputFlusher = new OutputFlusher(threadName, timeout);
-			outputFlusher.start();
-		}
-	}
-
-	@Override
-	public void emit(T record) throws IOException, InterruptedException {
-		checkErroneous();
-		super.emit(record);
-	}
-
-	@Override
-	public void broadcastEmit(T record) throws IOException, InterruptedException {
-		checkErroneous();
-		super.broadcastEmit(record);
-	}
-
-	@Override
-	public void randomEmit(T record) throws IOException, InterruptedException {
-		checkErroneous();
-		super.randomEmit(record);
-	}
-
-	/**
-	 * Closes the writer. This stops the flushing thread (if there is one).
-	 */
-	public void close() {
-		clearBuffers();
-		// make sure we terminate the thread in any case
-		if (outputFlusher != null) {
-			outputFlusher.terminate();
-			try {
-				outputFlusher.join();
-			}
-			catch (InterruptedException e) {
-				// ignore on close
-				// restore interrupt flag to fast exit further blocking calls
-				Thread.currentThread().interrupt();
-			}
-		}
-	}
-
-	/**
-	 * Notifies the writer that the output flusher thread encountered an exception.
-	 *
-	 * @param t The exception to report.
-	 */
-	private void notifyFlusherException(Throwable t) {
-		if (this.flusherException == null) {
-			this.flusherException = t;
-		}
-	}
-
-	private void checkErroneous() throws IOException {
-		if (flusherException != null) {
-			throw new IOException("An exception happened while flushing the outputs", flusherException);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
-	 *
-	 * <p>The thread is daemonic, because it is only a utility thread.
-	 */
-	private class OutputFlusher extends Thread {
-
-		private final long timeout;
-
-		private volatile boolean running = true;
-
-		OutputFlusher(String name, long timeout) {
-			super(name);
-			setDaemon(true);
-			this.timeout = timeout;
-		}
-
-		public void terminate() {
-			running = false;
-			interrupt();
-		}
-
-		@Override
-		public void run() {
-			try {
-				while (running) {
-					try {
-						Thread.sleep(timeout);
-					}
-					catch (InterruptedException e) {
-						// propagate this if we are still running, because it should not happen
-						// in that case
-						if (running) {
-							throw new Exception(e);
-						}
-					}
-
-					// any errors here should let the thread come to a halt and be
-					// recognized by the writer
-					flushAll();
-				}
-			}
-			catch (Throwable t) {
-				notifyFlusherException(t);
-			}
-		}
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index d49b023..d13ab01 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
@@ -42,7 +43,6 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -96,7 +96,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
 	public OperatorChain(
 			StreamTask<OUT, OP> containingTask,
-			List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters) {
+			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters) {
 
 		final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
 		final StreamConfig configuration = containingTask.getConfiguration();
@@ -119,7 +119,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				StreamEdge outEdge = outEdgesInOrder.get(i);
 
 				RecordWriterOutput<?> streamOutput = createStreamOutput(
-					streamRecordWriters.get(i),
+					recordWriters.get(i),
 					outEdge,
 					chainedConfigs.get(outEdge.getSourceId()),
 					containingTask.getEnvironment());
@@ -389,7 +389,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 	}
 
 	private RecordWriterOutput<OUT> createStreamOutput(
-			StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter,
+			RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
 			StreamEdge edge,
 			StreamConfig upStreamConfig,
 			Environment taskEnvironment) {
@@ -406,7 +406,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
 		}
 
-		return new RecordWriterOutput<>(streamRecordWriter, outSerializer, sideOutputTag, this);
+		return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9ee8892..a38886e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -177,7 +177,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	/** Wrapper for synchronousCheckpointExceptionHandler to deal with rethrown exceptions. Used in the async part. */
 	private AsyncCheckpointExceptionHandler asynchronousCheckpointExceptionHandler;
 
-	private final List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters;
+	private final List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters;
 
 	// ------------------------------------------------------------------------
 
@@ -209,7 +209,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		this.timerService = timeProvider;
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
-		this.streamRecordWriters = createStreamRecordWriters(configuration, environment);
+		this.recordWriters = createRecordWriters(configuration, environment);
 	}
 
 	// ------------------------------------------------------------------------
@@ -264,7 +264,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
 			}
 
-			operatorChain = new OperatorChain<>(this, streamRecordWriters);
+			operatorChain = new OperatorChain<>(this, recordWriters);
 			headOperator = operatorChain.getHeadOperator();
 
 			// task specific initialization
@@ -650,9 +650,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
 				Exception exception = null;
 
-				for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
+				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
 					try {
-						streamRecordWriter.broadcastEvent(message);
+						recordWriter.broadcastEvent(message);
 					} catch (Exception e) {
 						exception = ExceptionUtils.firstOrSuppressed(
 							new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
@@ -1157,27 +1157,27 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@VisibleForTesting
-	public static <OUT> List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createStreamRecordWriters(
+	public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
 			StreamConfig configuration,
 			Environment environment) {
-		List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters = new ArrayList<>();
+		List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
 		List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
 		Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
 
 		for (int i = 0; i < outEdgesInOrder.size(); i++) {
 			StreamEdge edge = outEdgesInOrder.get(i);
-			streamRecordWriters.add(
-				createStreamRecordWriter(
+			recordWriters.add(
+				createRecordWriter(
 					edge,
 					i,
 					environment,
 					environment.getTaskInfo().getTaskName(),
 					chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
 		}
-		return streamRecordWriters;
+		return recordWriters;
 	}
 
-	private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
+	private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
 			StreamEdge edge,
 			int outputIndex,
 			Environment environment,
@@ -1198,8 +1198,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			}
 		}
 
-		StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
-			new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
+		RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
+			new RecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
 		output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
 		return output;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
index ba3294b..f72c3b1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.io.benchmark;
 
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.types.LongValue;
 
 import java.io.IOException;
@@ -34,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * records.
  */
 public class LongRecordWriterThread extends CheckedThread {
-	private final StreamRecordWriter<LongValue> recordWriter;
+	private final RecordWriter<LongValue> recordWriter;
 	private final boolean broadcastMode;
 
 	/**
@@ -45,7 +44,7 @@ public class LongRecordWriterThread extends CheckedThread {
 	private volatile boolean running = true;
 
 	public LongRecordWriterThread(
-			StreamRecordWriter<LongValue> recordWriter,
+			RecordWriter<LongValue> recordWriter,
 			boolean broadcastMode) {
 		this.recordWriter = checkNotNull(recordWriter);
 		this.broadcastMode = broadcastMode;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 941b937..3a69875 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
@@ -52,7 +53,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -182,9 +182,9 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 		return receiver;
 	}
 
-	public StreamRecordWriter<T> createRecordWriter(int partitionIndex, long flushTimeout) throws Exception {
+	public RecordWriter<T> createRecordWriter(int partitionIndex, long flushTimeout) throws Exception {
 		ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
-		return new StreamRecordWriter<>(sender,  new RoundRobinChannelSelector<T>(), flushTimeout);
+		return new RecordWriter<>(sender, new RoundRobinChannelSelector<T>(), flushTimeout, null);
 	}
 
 	private void generatePartitionIds() throws Exception {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index bf6fdc4..7488688 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -71,7 +71,7 @@ public class StreamNetworkPointToPointBenchmark {
 	 *
 	 * @param flushTimeout
 	 * 		output flushing interval of the
-	 * 		{@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread
+	 * 		{@link org.apache.flink.runtime.io.network.api.writer.RecordWriter}'s output flusher thread
 	 */
 	public void setUp(long flushTimeout, Configuration config) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index 9c3b08f..61a6a66 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -269,7 +269,7 @@ public class StreamOperatorChainingTest {
 			StreamConfig streamConfig,
 			Environment environment,
 			StreamTask<IN, OT> task) {
-		return new OperatorChain<>(task, StreamTask.createStreamRecordWriters(streamConfig, environment));
+		return new OperatorChain<>(task, StreamTask.createRecordWriters(streamConfig, environment));
 	}
 
 	private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> createMockTask(