You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/01/23 17:38:00 UTC
[flink] branch master updated: [FLINK-11282][network] Merge
StreamRecordWriter into RecordWriter (#7438)
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b12d392 [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter (#7438)
b12d392 is described below
commit b12d392e7c1e7e2baca995d475fa259efc6af246
Author: zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jan 24 01:37:51 2019 +0800
[FLINK-11282][network] Merge StreamRecordWriter into RecordWriter (#7438)
[FLINK-11282][network] Merge StreamRecordWriter into RecordWriter
---
.../io/network/api/writer/RecordWriter.java | 132 ++++++++++++++-
.../streaming/runtime/io/RecordWriterOutput.java | 8 +-
.../streaming/runtime/io/StreamRecordWriter.java | 181 ---------------------
.../streaming/runtime/tasks/OperatorChain.java | 10 +-
.../flink/streaming/runtime/tasks/StreamTask.java | 28 ++--
.../io/benchmark/LongRecordWriterThread.java | 5 +-
.../StreamNetworkBenchmarkEnvironment.java | 6 +-
.../StreamNetworkPointToPointBenchmark.java | 2 +-
.../operators/StreamOperatorChainingTest.java | 2 +-
9 files changed, 155 insertions(+), 219 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 6a691c3..d1f8d3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -30,11 +30,15 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.XORShiftRandom;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.Optional;
import java.util.Random;
import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;
/**
@@ -52,7 +56,9 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
public class RecordWriter<T extends IOReadableWritable> {
- protected final ResultPartitionWriter targetPartition;
+ private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
+
+ private final ResultPartitionWriter targetPartition;
private final ChannelSelector<T> channelSelector;
@@ -66,23 +72,35 @@ public class RecordWriter<T extends IOReadableWritable> {
private final Random rng = new XORShiftRandom();
- private final boolean flushAlways;
-
private Counter numBytesOut = new SimpleCounter();
private Counter numBuffersOut = new SimpleCounter();
+ private final boolean flushAlways;
+
+ /** Default name for teh output flush thread, if no name with a task reference is given. */
+ private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+
+ /** The thread that periodically flushes the output, to give an upper latency bound. */
+ private final Optional<OutputFlusher> outputFlusher;
+
+ /** To avoid synchronization overhead on the critical path, best-effort error tracking is enough here.*/
+ private Throwable flusherException;
+
public RecordWriter(ResultPartitionWriter writer) {
this(writer, new RoundRobinChannelSelector<T>());
}
@SuppressWarnings("unchecked")
public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
- this(writer, channelSelector, false);
+ this(writer, channelSelector, -1, null);
}
- public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, boolean flushAlways) {
- this.flushAlways = flushAlways;
+ public RecordWriter(
+ ResultPartitionWriter writer,
+ ChannelSelector<T> channelSelector,
+ long timeout,
+ String taskName) {
this.targetPartition = writer;
this.channelSelector = channelSelector;
this.numberOfChannels = writer.getNumberOfSubpartitions();
@@ -95,9 +113,23 @@ public class RecordWriter<T extends IOReadableWritable> {
broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
+
+ checkArgument(timeout >= -1);
+ this.flushAlways = (timeout == 0);
+ if (timeout == -1 || timeout == 0) {
+ outputFlusher = Optional.empty();
+ } else {
+ String threadName = taskName == null ?
+ DEFAULT_OUTPUT_FLUSH_THREAD_NAME :
+ DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " for " + taskName;
+
+ outputFlusher = Optional.of(new OutputFlusher(threadName, timeout));
+ outputFlusher.get().start();
+ }
}
public void emit(T record) throws IOException, InterruptedException {
+ checkErroneous();
emit(record, channelSelector.selectChannels(record));
}
@@ -106,6 +138,7 @@ public class RecordWriter<T extends IOReadableWritable> {
* the {@link ChannelSelector}.
*/
public void broadcastEmit(T record) throws IOException, InterruptedException {
+ checkErroneous();
emit(record, broadcastChannels);
}
@@ -113,8 +146,8 @@ public class RecordWriter<T extends IOReadableWritable> {
* This is used to send LatencyMarks to a random target channel.
*/
public void randomEmit(T record) throws IOException, InterruptedException {
+ checkErroneous();
serializer.serializeRecord(record);
-
if (copyFromSerializerToTargetChannel(rng.nextInt(numberOfChannels))) {
serializer.prune();
}
@@ -245,4 +278,89 @@ public class RecordWriter<T extends IOReadableWritable> {
bufferBuilders[targetChannel] = Optional.empty();
}
}
+
+ /**
+ * Closes the writer. This stops the flushing thread (if there is one).
+ */
+ public void close() {
+ clearBuffers();
+ // make sure we terminate the thread in any case
+ if (outputFlusher.isPresent()) {
+ outputFlusher.get().terminate();
+ try {
+ outputFlusher.get().join();
+ } catch (InterruptedException e) {
+ // ignore on close
+ // restore interrupt flag to fast exit further blocking calls
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Notifies the writer that the output flusher thread encountered an exception.
+ *
+ * @param t The exception to report.
+ */
+ private void notifyFlusherException(Throwable t) {
+ if (flusherException == null) {
+ LOG.error("An exception happened while flushing the outputs", t);
+ flusherException = t;
+ }
+ }
+
+ private void checkErroneous() throws IOException {
+ if (flusherException != null) {
+ throw new IOException("An exception happened while flushing the outputs", flusherException);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+
+ /**
+ * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
+ *
+ * <p>The thread is daemonic, because it is only a utility thread.
+ */
+ private class OutputFlusher extends Thread {
+
+ private final long timeout;
+
+ private volatile boolean running = true;
+
+ OutputFlusher(String name, long timeout) {
+ super(name);
+ setDaemon(true);
+ this.timeout = timeout;
+ }
+
+ public void terminate() {
+ running = false;
+ interrupt();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (running) {
+ try {
+ Thread.sleep(timeout);
+ } catch (InterruptedException e) {
+ // propagate this if we are still running, because it should not happen
+ // in that case
+ if (running) {
+ throw new Exception(e);
+ }
+ }
+
+ // any errors here should let the thread come to a halt and be
+ // recognized by the writer
+ flushAll();
+ }
+ } catch (Throwable t) {
+ notifyFlusherException(t);
+ }
+ }
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index d2b4d1f..31fbcef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -45,7 +45,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
- private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
+ private RecordWriter<SerializationDelegate<StreamElement>> recordWriter;
private SerializationDelegate<StreamElement> serializationDelegate;
@@ -57,7 +57,7 @@ public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExpo
@SuppressWarnings("unchecked")
public RecordWriterOutput(
- StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
+ RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
TypeSerializer<OUT> outSerializer,
OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
@@ -66,8 +66,8 @@ public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExpo
this.outputTag = outputTag;
// generic hack: cast the writer to generic Object type so we can use it
// with multiplexed records and watermarks
- this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>)
- (StreamRecordWriter<?>) recordWriter;
+ this.recordWriter = (RecordWriter<SerializationDelegate<StreamElement>>)
+ (RecordWriter<?>) recordWriter;
TypeSerializer<StreamElement> outRecordSerializer =
new StreamElementSerializer<>(outSerializer);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
deleted file mode 100644
index acda489..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * This record writer keeps data in buffers at most for a certain timeout. It spawns a separate thread
- * that flushes the outputs in a defined interval, to make sure data does not linger in the buffers for too long.
- *
- * @param <T> The type of elements written.
- */
-@Internal
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
-
- /** Default name for the output flush thread, if no name with a task reference is given. */
- private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
-
-
- /** The thread that periodically flushes the output, to give an upper latency bound. */
- private final OutputFlusher outputFlusher;
-
- /** The exception encountered in the flushing thread. */
- private Throwable flusherException;
-
- public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout) {
- this(writer, channelSelector, timeout, null);
- }
-
- public StreamRecordWriter(
- ResultPartitionWriter writer,
- ChannelSelector<T> channelSelector,
- long timeout,
- String taskName) {
- super(writer, channelSelector, timeout == 0);
-
- checkArgument(timeout >= -1);
-
- if (timeout == -1) {
- outputFlusher = null;
- }
- else if (timeout == 0) {
- outputFlusher = null;
- }
- else {
- String threadName = taskName == null ?
- DEFAULT_OUTPUT_FLUSH_THREAD_NAME :
- DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " for " + taskName;
-
- outputFlusher = new OutputFlusher(threadName, timeout);
- outputFlusher.start();
- }
- }
-
- @Override
- public void emit(T record) throws IOException, InterruptedException {
- checkErroneous();
- super.emit(record);
- }
-
- @Override
- public void broadcastEmit(T record) throws IOException, InterruptedException {
- checkErroneous();
- super.broadcastEmit(record);
- }
-
- @Override
- public void randomEmit(T record) throws IOException, InterruptedException {
- checkErroneous();
- super.randomEmit(record);
- }
-
- /**
- * Closes the writer. This stops the flushing thread (if there is one).
- */
- public void close() {
- clearBuffers();
- // make sure we terminate the thread in any case
- if (outputFlusher != null) {
- outputFlusher.terminate();
- try {
- outputFlusher.join();
- }
- catch (InterruptedException e) {
- // ignore on close
- // restore interrupt flag to fast exit further blocking calls
- Thread.currentThread().interrupt();
- }
- }
- }
-
- /**
- * Notifies the writer that the output flusher thread encountered an exception.
- *
- * @param t The exception to report.
- */
- private void notifyFlusherException(Throwable t) {
- if (this.flusherException == null) {
- this.flusherException = t;
- }
- }
-
- private void checkErroneous() throws IOException {
- if (flusherException != null) {
- throw new IOException("An exception happened while flushing the outputs", flusherException);
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
- *
- * <p>The thread is daemonic, because it is only a utility thread.
- */
- private class OutputFlusher extends Thread {
-
- private final long timeout;
-
- private volatile boolean running = true;
-
- OutputFlusher(String name, long timeout) {
- super(name);
- setDaemon(true);
- this.timeout = timeout;
- }
-
- public void terminate() {
- running = false;
- interrupt();
- }
-
- @Override
- public void run() {
- try {
- while (running) {
- try {
- Thread.sleep(timeout);
- }
- catch (InterruptedException e) {
- // propagate this if we are still running, because it should not happen
- // in that case
- if (running) {
- throw new Exception(e);
- }
- }
-
- // any errors here should let the thread come to a halt and be
- // recognized by the writer
- flushAll();
- }
- }
- catch (Throwable t) {
- notifyFlusherException(t);
- }
- }
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index d49b023..d13ab01 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
@@ -42,7 +43,6 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -96,7 +96,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
public OperatorChain(
StreamTask<OUT, OP> containingTask,
- List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters) {
+ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters) {
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
final StreamConfig configuration = containingTask.getConfiguration();
@@ -119,7 +119,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
StreamEdge outEdge = outEdgesInOrder.get(i);
RecordWriterOutput<?> streamOutput = createStreamOutput(
- streamRecordWriters.get(i),
+ recordWriters.get(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
@@ -389,7 +389,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
private RecordWriterOutput<OUT> createStreamOutput(
- StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter,
+ RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
StreamEdge edge,
StreamConfig upStreamConfig,
Environment taskEnvironment) {
@@ -406,7 +406,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
}
- return new RecordWriterOutput<>(streamRecordWriter, outSerializer, sideOutputTag, this);
+ return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}
// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9ee8892..a38886e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -177,7 +177,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
/** Wrapper for synchronousCheckpointExceptionHandler to deal with rethrown exceptions. Used in the async part. */
private AsyncCheckpointExceptionHandler asynchronousCheckpointExceptionHandler;
- private final List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters;
+ private final List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters;
// ------------------------------------------------------------------------
@@ -209,7 +209,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
this.timerService = timeProvider;
this.configuration = new StreamConfig(getTaskConfiguration());
this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
- this.streamRecordWriters = createStreamRecordWriters(configuration, environment);
+ this.recordWriters = createRecordWriters(configuration, environment);
}
// ------------------------------------------------------------------------
@@ -264,7 +264,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
}
- operatorChain = new OperatorChain<>(this, streamRecordWriters);
+ operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();
// task specific initialization
@@ -650,9 +650,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
Exception exception = null;
- for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
try {
- streamRecordWriter.broadcastEvent(message);
+ recordWriter.broadcastEvent(message);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
@@ -1157,27 +1157,27 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@VisibleForTesting
- public static <OUT> List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createStreamRecordWriters(
+ public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
StreamConfig configuration,
Environment environment) {
- List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters = new ArrayList<>();
+ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge edge = outEdgesInOrder.get(i);
- streamRecordWriters.add(
- createStreamRecordWriter(
+ recordWriters.add(
+ createRecordWriter(
edge,
i,
environment,
environment.getTaskInfo().getTaskName(),
chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
}
- return streamRecordWriters;
+ return recordWriters;
}
- private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
+ private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
@@ -1198,8 +1198,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
- new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
+ RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
+ new RecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
index ba3294b..f72c3b1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.io.benchmark;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.types.LongValue;
import java.io.IOException;
@@ -34,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* records.
*/
public class LongRecordWriterThread extends CheckedThread {
- private final StreamRecordWriter<LongValue> recordWriter;
+ private final RecordWriter<LongValue> recordWriter;
private final boolean broadcastMode;
/**
@@ -45,7 +44,7 @@ public class LongRecordWriterThread extends CheckedThread {
private volatile boolean running = true;
public LongRecordWriterThread(
- StreamRecordWriter<LongValue> recordWriter,
+ RecordWriter<LongValue> recordWriter,
boolean broadcastMode) {
this.recordWriter = checkNotNull(recordWriter);
this.broadcastMode = broadcastMode;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 941b937..3a69875 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
@@ -52,7 +53,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import java.io.IOException;
import java.net.InetAddress;
@@ -182,9 +182,9 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
return receiver;
}
- public StreamRecordWriter<T> createRecordWriter(int partitionIndex, long flushTimeout) throws Exception {
+ public RecordWriter<T> createRecordWriter(int partitionIndex, long flushTimeout) throws Exception {
ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
- return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector<T>(), flushTimeout);
+ return new RecordWriter<>(sender, new RoundRobinChannelSelector<T>(), flushTimeout, null);
}
private void generatePartitionIds() throws Exception {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index bf6fdc4..7488688 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -71,7 +71,7 @@ public class StreamNetworkPointToPointBenchmark {
*
* @param flushTimeout
* output flushing interval of the
- * {@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread
+ * {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter}'s output flusher thread
*/
public void setUp(long flushTimeout, Configuration config) throws Exception {
environment = new StreamNetworkBenchmarkEnvironment<>();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index 9c3b08f..61a6a66 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -269,7 +269,7 @@ public class StreamOperatorChainingTest {
StreamConfig streamConfig,
Environment environment,
StreamTask<IN, OT> task) {
- return new OperatorChain<>(task, StreamTask.createStreamRecordWriters(streamConfig, environment));
+ return new OperatorChain<>(task, StreamTask.createRecordWriters(streamConfig, environment));
}
private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> createMockTask(