You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/24 16:05:15 UTC
[flink] 04/07: [FLINK-19297][network] Make ResultPartitionWriter
record-oriented
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 900a896922403f4c1538d4b785d17b7cf72abcd7
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Wed Aug 26 14:22:25 2020 +0800
[FLINK-19297][network] Make ResultPartitionWriter record-oriented
Currently, the ResultPartitionWriter is buffer-oriented, that is, RecordWriter can only add buffers of different channels to ResultPartitionWriter and the buffer boundary serves as a nature boundary of data belonging to different channels. However, this abstraction is not flexible enough to handle new implementations like sort-based partitioning where records are appended a joint structure shared by all channels and sorting is used to cluster data belonging to different channels. This [...]
---
.../api/serialization/RecordSerializer.java | 94 ------
.../serialization/SpanningRecordSerializer.java | 114 --------
.../network/api/writer/BroadcastRecordWriter.java | 129 +-------
.../api/writer/ChannelSelectorRecordWriter.java | 82 +-----
.../io/network/api/writer/RecordWriter.java | 171 +++--------
.../network/api/writer/ResultPartitionWriter.java | 55 ++--
.../partition/BufferWritingResultPartition.java | 179 ++++++++++--
.../io/network/partition/ResultPartition.java | 21 +-
.../operators/shipping/OutputCollector.java | 3 -
...bleNotifyingResultPartitionWriterDecorator.java | 64 ++--
.../SpanningRecordSerializationTest.java | 65 +++--
.../SpanningRecordSerializerTest.java | 196 -------------
.../AbstractCollectingResultPartitionWriter.java | 70 ++---
.../api/writer/BroadcastRecordWriterTest.java | 77 +++--
.../RecordCollectingResultPartitionWriter.java | 4 +-
...cordOrEventCollectingResultPartitionWriter.java | 35 +--
.../api/writer/RecordWriterDelegateTest.java | 53 ++--
.../io/network/api/writer/RecordWriterTest.java | 325 +++++----------------
.../network/netty/PartitionRequestQueueTest.java | 21 +-
.../partition/MockResultPartitionWriter.java | 34 ++-
.../PartialConsumePipelinedResultTest.java | 7 +-
.../io/network/partition/PartitionTestUtils.java | 12 -
.../partition/PipelinedSubpartitionTest.java | 26 +-
.../io/network/partition/ResultPartitionTest.java | 252 ++++++++++------
.../IteratorWrappingTestSingleInputGate.java | 13 +-
.../partition/consumer/LocalInputChannelTest.java | 26 +-
.../partition/consumer/SingleInputGateTest.java | 6 +-
.../io/network/util/TestPartitionProducer.java | 16 +-
.../io/network/util/TestProducerSource.java | 14 +-
.../io/network/util/TestSubpartitionProducer.java | 13 +-
.../operators/testutils/MockEnvironment.java | 2 +-
.../consumer/StreamTestSingleInputGate.java | 11 +-
.../runtime/io/StreamTaskNetworkInputTest.java | 12 +-
.../runtime/tasks/StreamMockEnvironment.java | 2 -
.../tasks/SubtaskCheckpointCoordinatorTest.java | 4 +-
35 files changed, 785 insertions(+), 1423 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
deleted file mode 100644
index 6f73917..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-
-import java.io.IOException;
-
-/**
- * Interface for turning records into sequences of memory segments.
- */
-public interface RecordSerializer<T extends IOReadableWritable> {
-
- /**
- * Status of the serialization result.
- */
- enum SerializationResult {
- PARTIAL_RECORD_MEMORY_SEGMENT_FULL(false, true),
- FULL_RECORD_MEMORY_SEGMENT_FULL(true, true),
- FULL_RECORD(true, false);
-
- private final boolean isFullRecord;
-
- private final boolean isFullBuffer;
-
- SerializationResult(boolean isFullRecord, boolean isFullBuffer) {
- this.isFullRecord = isFullRecord;
- this.isFullBuffer = isFullBuffer;
- }
-
- /**
- * Whether the full record was serialized and completely written to
- * a target buffer.
- *
- * @return <tt>true</tt> if the complete record was written
- */
- public boolean isFullRecord() {
- return this.isFullRecord;
- }
-
- /**
- * Whether the target buffer is full after the serialization process.
- *
- * @return <tt>true</tt> if the target buffer is full
- */
- public boolean isFullBuffer() {
- return this.isFullBuffer;
- }
- }
-
- /**
- * Starts serializing the given record to an intermediate data buffer.
- *
- * @param record the record to serialize
- */
- void serializeRecord(T record) throws IOException;
-
- /**
- * Copies the intermediate data serialization buffer to the given target buffer.
- *
- * @param bufferBuilder the new target buffer to use
- * @return how much information was written to the target buffer and
- * whether this buffer is full
- */
- SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
-
- /**
- * Supports copying an intermediate data serialization buffer to multiple target buffers
- * by resetting its initial position before each copying.
- */
- void reset();
-
- /**
- * @return <tt>true</tt> if has some serialized data pending copying to the result {@link BufferBuilder}.
- */
- boolean hasSerializedData();
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
deleted file mode 100644
index 1bb9ec8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * Record serializer which serializes the complete record to an intermediate
- * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}.
- *
- * @param <T> The type of the records that are serialized.
- */
-public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
-
- /** Flag to enable/disable checks, if buffer not set/full or pending serialization. */
- private static final boolean CHECKED = false;
-
- /** Intermediate data serialization. */
- private final DataOutputSerializer serializationBuffer;
-
- /** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}). */
- private ByteBuffer dataBuffer;
-
- public SpanningRecordSerializer() {
- serializationBuffer = new DataOutputSerializer(128);
-
- // ensure initial state with hasRemaining false (for correct continueWritingWithNextBufferBuilder logic)
- dataBuffer = serializationBuffer.wrapAsByteBuffer();
- }
-
- /**
- * Serializes the complete record to an intermediate data serialization buffer.
- *
- * @param record the record to serialize
- */
- @Override
- public void serializeRecord(T record) throws IOException {
- if (CHECKED) {
- if (dataBuffer.hasRemaining()) {
- throw new IllegalStateException("Pending serialization of previous record.");
- }
- }
-
- serializationBuffer.clear();
- // the initial capacity of the serialization buffer should be no less than 4
- serializationBuffer.skipBytesToWrite(4);
-
- // write data and length
- record.write(serializationBuffer);
-
- int len = serializationBuffer.length() - 4;
- serializationBuffer.setPosition(0);
- serializationBuffer.writeInt(len);
- serializationBuffer.skipBytesToWrite(len);
-
- dataBuffer = serializationBuffer.wrapAsByteBuffer();
- }
-
- /**
- * Copies an intermediate data serialization buffer into the target BufferBuilder.
- *
- * @param targetBuffer the target BufferBuilder to copy to
- * @return how much information was written to the target buffer and
- * whether this buffer is full
- */
- @Override
- public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) {
- targetBuffer.append(dataBuffer);
- targetBuffer.commit();
-
- return getSerializationResult(targetBuffer);
- }
-
- private SerializationResult getSerializationResult(BufferBuilder targetBuffer) {
- if (dataBuffer.hasRemaining()) {
- return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
- }
- return !targetBuffer.isFull()
- ? SerializationResult.FULL_RECORD
- : SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
- }
-
- @Override
- public void reset() {
- dataBuffer.position(0);
- }
-
- @Override
- public boolean hasSerializedData() {
- return dataBuffer.hasRemaining();
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index b834738..ec800dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -18,40 +18,20 @@
package org.apache.flink.runtime.io.network.api.writer;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-
-import javax.annotation.Nullable;
import java.io.IOException;
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
* A special record-oriented runtime result writer only for broadcast mode.
*
- * <p>The BroadcastRecordWriter extends the {@link RecordWriter} and maintain a single {@link BufferBuilder}
- * for all the channels. Then the serialization results need be copied only once to this buffer which would be
- * shared for all the channels in a more efficient way.
+ * <p>The BroadcastRecordWriter extends the {@link RecordWriter} and emits records to all channels for
+ * regular {@link #emit(IOReadableWritable)}.
*
* @param <T> the type of the record that can be emitted with this record writer
*/
public final class BroadcastRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
- /** The current buffer builder shared for all the channels. */
- @Nullable
- private BufferBuilder bufferBuilder;
-
- /**
- * The flag for judging whether {@link #requestNewBufferBuilder(int)} and {@link #flushTargetPartition(int)}
- * is triggered by {@link #randomEmit(IOReadableWritable)} or not.
- */
- private boolean randomTriggered;
-
- private BufferConsumer randomTriggeredConsumer;
-
BroadcastRecordWriter(
ResultPartitionWriter writer,
long timeout,
@@ -60,113 +40,18 @@ public final class BroadcastRecordWriter<T extends IOReadableWritable> extends R
}
@Override
- public void emit(T record) throws IOException, InterruptedException {
+ public void emit(T record) throws IOException {
broadcastEmit(record);
}
@Override
- public void randomEmit(T record) throws IOException, InterruptedException {
- randomEmit(record, rng.nextInt(numberOfChannels));
- }
-
- /**
- * For non-broadcast emit, we try to finish the current {@link BufferBuilder} first, and then request
- * a new {@link BufferBuilder} for the random channel. If this new {@link BufferBuilder} is not full,
- * it can be shared for all the other channels via initializing readable position in created
- * {@link BufferConsumer}.
- */
- @VisibleForTesting
- void randomEmit(T record, int targetChannelIndex) throws IOException, InterruptedException {
- tryFinishCurrentBufferBuilder(targetChannelIndex);
-
- randomTriggered = true;
- emit(record, targetChannelIndex);
- randomTriggered = false;
-
- if (bufferBuilder != null) {
- for (int index = 0; index < numberOfChannels; index++) {
- if (index != targetChannelIndex) {
- addBufferConsumer(randomTriggeredConsumer.copyWithReaderPosition(bufferBuilder.getCommittedBytes()), index);
- }
- }
- }
- }
+ public void broadcastEmit(T record) throws IOException {
+ checkErroneous();
- @Override
- public void broadcastEmit(T record) throws IOException, InterruptedException {
- // We could actually select any target channel here because all the channels
- // are sharing the same BufferBuilder in broadcast mode.
- emit(record, 0);
- }
+ targetPartition.broadcastRecord(serializeRecord(serializer, record));
- /**
- * The flush could be triggered by {@link #randomEmit(IOReadableWritable)}, {@link #emit(IOReadableWritable)}
- * or {@link #broadcastEmit(IOReadableWritable)}. Only random emit should flush a single target channel,
- * otherwise we should flush all the channels.
- */
- @Override
- public void flushTargetPartition(int targetChannel) {
- if (randomTriggered) {
- super.flushTargetPartition(targetChannel);
- } else {
+ if (flushAlways) {
flushAll();
}
}
-
- @Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- return bufferBuilder != null ? bufferBuilder : requestNewBufferBuilder(targetChannel);
- }
-
- /**
- * The request could be from broadcast or non-broadcast modes like {@link #randomEmit(IOReadableWritable)}.
- *
- * <p>For non-broadcast, the created {@link BufferConsumer} is only for the target channel.
- *
- * <p>For broadcast, all the channels share the same requested {@link BufferBuilder} and the created
- * {@link BufferConsumer} is copied for every channel.
- */
- @Override
- public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- checkState(bufferBuilder == null || bufferBuilder.isFinished());
-
- BufferBuilder builder = super.requestNewBufferBuilder(targetChannel);
- if (randomTriggered) {
- addBufferConsumer(randomTriggeredConsumer = builder.createBufferConsumer(), targetChannel);
- } else {
- try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
- for (int channel = 0; channel < numberOfChannels; channel++) {
- addBufferConsumer(bufferConsumer.copy(), channel);
- }
- }
- }
-
- bufferBuilder = builder;
- return builder;
- }
-
- @Override
- public void tryFinishCurrentBufferBuilder(int targetChannel) {
- if (bufferBuilder == null) {
- return;
- }
-
- BufferBuilder builder = bufferBuilder;
- bufferBuilder = null;
-
- finishBufferBuilder(builder);
- }
-
- @Override
- public void emptyCurrentBufferBuilder(int targetChannel) {
- bufferBuilder = null;
- }
-
- @Override
- public void closeBufferBuilders() {
- if (bufferBuilder != null) {
- bufferBuilder.finish();
- bufferBuilder = null;
- }
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 5e32056..b94207e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -19,19 +19,17 @@
package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import java.io.IOException;
+import java.nio.ByteBuffer;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* A regular record-oriented runtime result writer.
*
- * <p>The ChannelSelectorRecordWriter extends the {@link RecordWriter} and maintains an array of
- * {@link BufferBuilder}s for all the channels. The {@link #emit(IOReadableWritable)}
- * operation is based on {@link ChannelSelector} to select the target channel.
+ * <p>The ChannelSelectorRecordWriter extends the {@link RecordWriter} and emits records to the channel
+ * selected by the {@link ChannelSelector} for regular {@link #emit(IOReadableWritable)}.
*
* @param <T> the type of the record that can be emitted with this record writer
*/
@@ -39,9 +37,6 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> ext
private final ChannelSelector<T> channelSelector;
- /** Every subpartition maintains a separate buffer builder which might be null. */
- private final BufferBuilder[] bufferBuilders;
-
ChannelSelectorRecordWriter(
ResultPartitionWriter writer,
ChannelSelector<T> channelSelector,
@@ -51,77 +46,28 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> ext
this.channelSelector = checkNotNull(channelSelector);
this.channelSelector.setup(numberOfChannels);
-
- this.bufferBuilders = new BufferBuilder[numberOfChannels];
}
@Override
- public void emit(T record) throws IOException, InterruptedException {
+ public void emit(T record) throws IOException {
emit(record, channelSelector.selectChannel(record));
}
@Override
- public void randomEmit(T record) throws IOException, InterruptedException {
- emit(record, rng.nextInt(numberOfChannels));
- }
-
- /**
- * The record is serialized into intermediate serialization buffer which is then copied
- * into the target buffer for every channel.
- */
- @Override
- public void broadcastEmit(T record) throws IOException, InterruptedException {
+ public void broadcastEmit(T record) throws IOException {
checkErroneous();
- serializer.serializeRecord(record);
-
- for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
- copyFromSerializerToTargetChannel(targetChannel);
- }
- }
-
- @Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- if (bufferBuilders[targetChannel] != null) {
- return bufferBuilders[targetChannel];
- } else {
- return requestNewBufferBuilder(targetChannel);
- }
- }
-
- @Override
- public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());
-
- BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);
- addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
- bufferBuilders[targetChannel] = bufferBuilder;
- return bufferBuilder;
- }
-
- @Override
- public void tryFinishCurrentBufferBuilder(int targetChannel) {
- if (bufferBuilders[targetChannel] == null) {
- return;
+ // Emitting to all channels in a for loop can be better than calling
+ // ResultPartitionWriter#broadcastRecord because the broadcastRecord
+ // method incurs extra overhead.
+ ByteBuffer serializedRecord = serializeRecord(serializer, record);
+ for (int channelIndex = 0; channelIndex < numberOfChannels; channelIndex++) {
+ serializedRecord.rewind();
+ emit(record, channelIndex);
}
- BufferBuilder bufferBuilder = bufferBuilders[targetChannel];
- bufferBuilders[targetChannel] = null;
- finishBufferBuilder(bufferBuilder);
- }
-
- @Override
- public void emptyCurrentBufferBuilder(int targetChannel) {
- bufferBuilders[targetChannel] = null;
- }
-
- @Override
- public void closeBufferBuilders() {
- for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
- if (bufferBuilders[targetChannel] != null) {
- bufferBuilders[targetChannel].finish();
- bufferBuilders[targetChannel] = null;
- }
+ if (flushAlways) {
+ flushAll();
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 995e670..b58ef38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -20,17 +20,9 @@ package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.AvailabilityProvider;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.XORShiftRandom;
@@ -40,18 +32,17 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
-import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* An abstract record-oriented runtime result writer.
*
* <p>The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and takes care of
- * serializing records into buffers.
+ * channel selection and serializing records into bytes.
*
* @param <T> the type of the record that can be emitted with this record writer
*/
@@ -63,21 +54,15 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
- private final ResultPartitionWriter targetPartition;
+ protected final ResultPartitionWriter targetPartition;
protected final int numberOfChannels;
- protected final RecordSerializer<T> serializer;
+ protected final DataOutputSerializer serializer;
protected final Random rng = new XORShiftRandom();
- private Counter numBytesOut = new SimpleCounter();
-
- private Counter numBuffersOut = new SimpleCounter();
-
- protected Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter());
-
- private final boolean flushAlways;
+ protected final boolean flushAlways;
/** The thread that periodically flushes the output, to give an upper latency bound. */
@Nullable
@@ -93,7 +78,7 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
this.targetPartition = writer;
this.numberOfChannels = writer.getNumberOfSubpartitions();
- this.serializer = new SpanningRecordSerializer<T>();
+ this.serializer = new DataOutputSerializer(128);
checkArgument(timeout >= -1);
this.flushAlways = (timeout == 0);
@@ -109,89 +94,58 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
}
}
- protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
+ protected void emit(T record, int targetSubpartition) throws IOException {
checkErroneous();
- serializer.serializeRecord(record);
+ targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);
- // Make sure we don't hold onto the large intermediate serialization buffer for too long
- copyFromSerializerToTargetChannel(targetChannel);
+ if (flushAlways) {
+ targetPartition.flush(targetSubpartition);
+ }
}
- /**
- * @param targetChannel
- * @return <tt>true</tt> if the intermediate serialization buffer should be pruned
- */
- protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
- // We should reset the initial position of the intermediate serialization buffer before
- // copying, so the serialization results can be copied to multiple target buffers.
- serializer.reset();
-
- boolean pruneTriggered = false;
- BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
- SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
- while (result.isFullBuffer()) {
- finishBufferBuilder(bufferBuilder);
-
- // If this was a full record, we are done. Not breaking out of the loop at this point
- // will lead to another buffer request before breaking out (that would not be a
- // problem per se, but it can lead to stalls in the pipeline).
- if (result.isFullRecord()) {
- pruneTriggered = true;
- emptyCurrentBufferBuilder(targetChannel);
- break;
- }
+ public void broadcastEvent(AbstractEvent event) throws IOException {
+ broadcastEvent(event, false);
+ }
- bufferBuilder = requestNewBufferBuilder(targetChannel);
- result = serializer.copyToBufferBuilder(bufferBuilder);
- }
- checkState(!serializer.hasSerializedData(), "All data should be written at once");
+ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+ targetPartition.broadcastEvent(event, isPriorityEvent);
if (flushAlways) {
- flushTargetPartition(targetChannel);
+ flushAll();
}
- return pruneTriggered;
}
- public void broadcastEvent(AbstractEvent event) throws IOException {
- broadcastEvent(event, false);
- }
+ @VisibleForTesting
+ public static ByteBuffer serializeRecord(
+ DataOutputSerializer serializer,
+ IOReadableWritable record) throws IOException {
+ serializer.clear();
- public void broadcastEvent(AbstractEvent event, boolean hasPriority) throws IOException {
- try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, hasPriority)) {
- for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
- tryFinishCurrentBufferBuilder(targetChannel);
+ // the initial capacity should be no less than 4 bytes
+ serializer.skipBytesToWrite(4);
- // Retain the buffer so that it can be recycled by each channel of targetPartition
- targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
- }
+ // write data
+ record.write(serializer);
- if (flushAlways) {
- flushAll();
- }
- }
+ // write length
+ int len = serializer.length() - 4;
+ serializer.setPosition(0);
+ serializer.writeInt(len);
+ serializer.skipBytesToWrite(len);
+
+ return serializer.wrapAsByteBuffer();
}
public void flushAll() {
targetPartition.flushAll();
}
- protected void flushTargetPartition(int targetChannel) {
- targetPartition.flush(targetChannel);
- }
-
/**
* Sets the metric group for this RecordWriter.
*/
public void setMetricGroup(TaskIOMetricGroup metrics) {
- numBytesOut = metrics.getNumBytesOutCounter();
- numBuffersOut = metrics.getNumBuffersOutCounter();
- idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond();
- }
-
- protected void finishBufferBuilder(BufferBuilder bufferBuilder) {
- numBytesOut.inc(bufferBuilder.finish());
- numBuffersOut.inc();
+ targetPartition.setMetricGroup(metrics);
}
@Override
@@ -202,44 +156,27 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
/**
* This is used to send regular records.
*/
- public abstract void emit(T record) throws IOException, InterruptedException;
+ public abstract void emit(T record) throws IOException;
/**
* This is used to send LatencyMarks to a random target channel.
*/
- public abstract void randomEmit(T record) throws IOException, InterruptedException;
-
- /**
- * This is used to broadcast streaming Watermarks in-band with records.
- */
- public abstract void broadcastEmit(T record) throws IOException, InterruptedException;
-
- /**
- * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need
- * request a new one for this target channel.
- */
- abstract BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException;
-
- /**
- * Marks the current {@link BufferBuilder} as finished if present and clears the state for next one.
- */
- abstract void tryFinishCurrentBufferBuilder(int targetChannel);
+ public void randomEmit(T record) throws IOException {
+ checkErroneous();
- /**
- * Marks the current {@link BufferBuilder} as empty for the target channel.
- */
- abstract void emptyCurrentBufferBuilder(int targetChannel);
+ int targetSubpartition = rng.nextInt(numberOfChannels);
+ emit(record, targetSubpartition);
+ }
/**
- * Marks the current {@link BufferBuilder}s as finished and releases the resources.
+ * This is used to broadcast streaming Watermarks in-band with records.
*/
- abstract void closeBufferBuilders();
+ public abstract void broadcastEmit(T record) throws IOException;
/**
* Closes the writer. This stops the flushing thread (if there is one).
*/
public void close() {
- closeBufferBuilders();
// make sure we terminate the thread in any case
if (outputFlusher != null) {
outputFlusher.terminate();
@@ -277,28 +214,6 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
}
}
- protected void addBufferConsumer(BufferConsumer consumer, int targetChannel) throws IOException {
- targetPartition.addBufferConsumer(consumer, targetChannel);
- }
-
- /**
- * Requests a new {@link BufferBuilder} for the target channel and returns it.
- */
- public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);
- if (builder == null) {
- long start = System.currentTimeMillis();
- builder = targetPartition.getBufferBuilder(targetChannel);
- idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
- }
- return builder;
- }
-
- @VisibleForTesting
- public Meter getIdleTimeMsPerSecond() {
- return idleTimeMsPerSecond;
- }
-
// ------------------------------------------------------------------------
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 838850e..a90b0ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,19 +18,20 @@
package org.apache.flink.runtime.io.network.api.writer;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.AvailabilityProvider;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.nio.ByteBuffer;
/**
- * A buffer-oriented runtime result writer API for producing results.
+ * A record-oriented runtime result writer API for producing results.
*
* <p>If {@link ResultPartitionWriter#close()} is called before {@link ResultPartitionWriter#fail(Throwable)} or
* {@link ResultPartitionWriter#finish()}, it abruptly triggers failure and cancellation of production.
@@ -51,30 +52,27 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
int getNumTargetKeyGroups();
/**
- * Requests a {@link BufferBuilder} from this partition for writing data.
+ * Writes the given serialized record to the target subpartition.
*/
- BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException;
+ void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException;
+ /**
+ * Writes the given serialized record to all subpartitions. One can also achieve the same effect by emitting
+ * the same record to all subpartitions one by one, however, this method can have better performance for the
+ * underlying implementation can do some optimizations, for example coping the given serialized record only
+ * once to a shared channel which can be consumed by all subpartitions.
+ */
+ void broadcastRecord(ByteBuffer record) throws IOException;
/**
- * Try to request a {@link BufferBuilder} from this partition for writing data.
- *
- * <p>Returns <code>null</code> if no buffer is available or the buffer provider has been destroyed.
+ * Writes the given {@link AbstractEvent} to all channels.
*/
- BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException;
+ void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;
/**
- * Adds the bufferConsumer to the subpartition with the given index.
- *
- * <p>This method takes the ownership of the passed {@code bufferConsumer} and thus is responsible for releasing
- * it's resources.
- *
- * <p>To avoid problems with data re-ordering, before adding new {@link BufferConsumer} the previously added one
- * the given {@code subpartitionIndex} must be marked as {@link BufferConsumer#isFinished()}.
- *
- * @return true if operation succeeded and bufferConsumer was enqueued for consumption.
+ * Sets the metric group for the {@link ResultPartitionWriter}.
*/
- boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException;
+ void setMetricGroup(TaskIOMetricGroup metrics);
/**
* Returns a reader for the subpartition with the given index.
@@ -82,12 +80,12 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException;
/**
- * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in all subpartitions.
+ * Manually trigger the consumption of data from all subpartitions.
*/
void flushAll();
/**
- * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in one specified subpartition.
+ * Manually trigger the consumption of data from the given subpartitions.
*/
void flush(int subpartitionIndex);
@@ -108,4 +106,19 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
* <p>Closing of partition is still needed afterwards.
*/
void finish() throws IOException;
+
+ boolean isFinished();
+
+ /**
+ * Releases the partition writer which releases the produced data and no reader can consume the
+ * partition any more.
+ */
+ void release(Throwable cause);
+
+ boolean isReleased();
+
+ /**
+ * Closes the partition writer which releases the allocated resource, for example the buffer pool.
+ */
+ void close() throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 437d0a0..bd9e8ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -19,16 +19,23 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.function.FunctionWithException;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.nio.ByteBuffer;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkElementIndex;
@@ -49,6 +56,14 @@ public class BufferWritingResultPartition extends ResultPartition {
/** The subpartitions of this partition. At least one. */
protected final ResultSubpartition[] subpartitions;
+ /** For non-broadcast mode, each subpartition maintains a separate BufferBuilder which might be null. */
+ private final BufferBuilder[] subpartitionBufferBuilders;
+
+ /** For broadcast mode, a single BufferBuilder is shared by all subpartitions. */
+ private BufferBuilder broadcastBufferBuilder;
+
+ private Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter());
+
public BufferWritingResultPartition(
String owningTaskName,
int partitionIndex,
@@ -72,6 +87,7 @@ public class BufferWritingResultPartition extends ResultPartition {
bufferPoolFactory);
this.subpartitions = checkNotNull(subpartitions);
+ this.subpartitionBufferBuilders = new BufferBuilder[subpartitions.length];
}
public int getNumberOfQueuedBuffers() {
@@ -90,46 +106,58 @@ public class BufferWritingResultPartition extends ResultPartition {
}
@Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- checkInProduceState();
-
- return bufferPool.requestBufferBuilderBlocking(targetChannel);
+ public void flushAll() {
+ for (ResultSubpartition subpartition : subpartitions) {
+ subpartition.flush();
+ }
}
@Override
- public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
- return bufferPool.requestBufferBuilder(targetChannel);
+ public void flush(int targetSubpartition) {
+ subpartitions[targetSubpartition].flush();
}
- @Override
- public boolean addBufferConsumer(
- BufferConsumer bufferConsumer,
- int subpartitionIndex) throws IOException {
- checkNotNull(bufferConsumer);
+ public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+ do {
+ final BufferBuilder bufferBuilder = getSubpartitionBufferBuilder(targetSubpartition);
+ bufferBuilder.appendAndCommit(record);
- ResultSubpartition subpartition;
- try {
- checkInProduceState();
- subpartition = subpartitions[subpartitionIndex];
- }
- catch (Exception ex) {
- bufferConsumer.close();
- throw ex;
- }
+ if (bufferBuilder.isFull()) {
+ finishSubpartitionBufferBuilder(targetSubpartition);
+ }
+ } while (record.hasRemaining());
+ }
+
+ @Override
+ public void broadcastRecord(ByteBuffer record) throws IOException {
+ do {
+ final BufferBuilder bufferBuilder = getBroadcastBufferBuilder();
+ bufferBuilder.appendAndCommit(record);
- return subpartition.add(bufferConsumer);
+ if (bufferBuilder.isFull()) {
+ finishBroadcastBufferBuilder();
+ }
+ } while (record.hasRemaining());
}
@Override
- public void flushAll() {
- for (ResultSubpartition subpartition : subpartitions) {
- subpartition.flush();
+ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+ checkInProduceState();
+ finishBroadcastBufferBuilder();
+ finishSubpartitionBufferBuilders();
+
+ try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, isPriorityEvent)) {
+ for (ResultSubpartition subpartition : subpartitions) {
+ // Retain the buffer so that it can be recycled by each channel of targetPartition
+ subpartition.add(eventBufferConsumer.copy());
+ }
}
}
@Override
- public void flush(int targetSubpartition) {
- subpartitions[targetSubpartition].flush();
+ public void setMetricGroup(TaskIOMetricGroup metrics) {
+ super.setMetricGroup(metrics);
+ idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond();
}
@Override
@@ -149,9 +177,13 @@ public class BufferWritingResultPartition extends ResultPartition {
@Override
public void finish() throws IOException {
+ finishBroadcastBufferBuilder();
+ finishSubpartitionBufferBuilders();
+
for (ResultSubpartition subpartition : subpartitions) {
subpartition.finish();
}
+
super.finish();
}
@@ -169,6 +201,101 @@ public class BufferWritingResultPartition extends ResultPartition {
}
}
+ private BufferBuilder getSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
+ final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition];
+ if (bufferBuilder != null) {
+ return bufferBuilder;
+ }
+
+ return getNewSubpartitionBufferBuilder(targetSubpartition);
+ }
+
+ private BufferBuilder getNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
+ checkInProduceState();
+ ensureUnicastMode();
+
+ final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
+ subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
+ subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
+ return bufferBuilder;
+ }
+
+ private BufferBuilder getBroadcastBufferBuilder() throws IOException {
+ if (broadcastBufferBuilder != null) {
+ return broadcastBufferBuilder;
+ }
+
+ return getNewBroadcastBufferBuilder();
+ }
+
+ private BufferBuilder getNewBroadcastBufferBuilder() throws IOException {
+ checkInProduceState();
+ ensureBroadcastMode();
+
+ final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(0);
+ broadcastBufferBuilder = bufferBuilder;
+
+ try (final BufferConsumer consumer = bufferBuilder.createBufferConsumer()) {
+ for (ResultSubpartition subpartition : subpartitions) {
+ subpartition.add(consumer.copy());
+ }
+ }
+
+ return bufferBuilder;
+ }
+
+ private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition) throws IOException {
+ BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
+ if (bufferBuilder != null) {
+ return bufferBuilder;
+ }
+
+ final long start = System.currentTimeMillis();
+ try {
+ bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
+ idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
+ return bufferBuilder;
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while waiting for buffer");
+ }
+ }
+
+ private void finishSubpartitionBufferBuilder(int targetSubpartition) {
+ final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition];
+ if (bufferBuilder != null) {
+ numBytesOut.inc(bufferBuilder.finish());
+ numBuffersOut.inc();
+ subpartitionBufferBuilders[targetSubpartition] = null;
+ }
+ }
+
+ private void finishSubpartitionBufferBuilders() {
+ for (int channelIndex = 0; channelIndex < numSubpartitions; channelIndex++) {
+ finishSubpartitionBufferBuilder(channelIndex);
+ }
+ }
+
+ private void finishBroadcastBufferBuilder() {
+ if (broadcastBufferBuilder != null) {
+ numBytesOut.inc(broadcastBufferBuilder.finish() * numSubpartitions);
+ numBuffersOut.inc(numSubpartitions);
+ broadcastBufferBuilder = null;
+ }
+ }
+
+ private void ensureUnicastMode() {
+ finishBroadcastBufferBuilder();
+ }
+
+ private void ensureBroadcastMode() {
+ finishSubpartitionBufferBuilders();
+ }
+
+ @VisibleForTesting
+ public Meter getIdleTimeMsPerSecond() {
+ return idleTimeMsPerSecond;
+ }
+
@VisibleForTesting
public ResultSubpartition[] getAllPartitions() {
return subpartitions;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 02c91c5..bd81c19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -27,6 +29,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
@@ -103,6 +106,10 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
@Nullable
protected final BufferCompressor bufferCompressor;
+ protected Counter numBytesOut = new SimpleCounter();
+
+ protected Counter numBuffersOut = new SimpleCounter();
+
public ResultPartition(
String owningTaskName,
int partitionIndex,
@@ -202,13 +209,15 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
isFinished = true;
}
+ @Override
+ public boolean isFinished() {
+ return isFinished;
+ }
+
public void release() {
release(null);
}
- /**
- * Releases the result partition.
- */
public void release(Throwable cause) {
if (isReleased.compareAndSet(false, true)) {
LOG.debug("{}: Releasing {}.", owningTaskName, this);
@@ -248,6 +257,12 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
return numTargetKeyGroups;
}
+ @Override
+ public void setMetricGroup(TaskIOMetricGroup metrics) {
+ numBytesOut = metrics.getNumBytesOutCounter();
+ numBuffersOut = metrics.getNumBuffersOutCounter();
+ }
+
/**
* Releases buffers held by this result partition.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
index d8cd9ec..a2dd4c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
@@ -68,9 +68,6 @@ public class OutputCollector<T> implements Collector<T> {
catch (IOException e) {
throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
}
- catch (InterruptedException e) {
- throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
- }
}
else {
throw new NullPointerException("The system does not support records that are null. "
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
index 3451939..3eca089 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
@@ -21,17 +21,18 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@@ -69,16 +70,6 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
}
@Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- return partitionWriter.getBufferBuilder(targetChannel);
- }
-
- @Override
- public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
- return partitionWriter.tryGetBufferBuilder(targetChannel);
- }
-
- @Override
public ResultPartitionID getPartitionId() {
return partitionWriter.getPartitionId();
}
@@ -99,18 +90,34 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
}
@Override
- public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
- return partitionWriter.createSubpartitionView(index, availabilityListener);
+ public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+ partitionWriter.emitRecord(record, targetSubpartition);
+
+ notifyPipelinedConsumers();
}
@Override
- public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
- boolean success = partitionWriter.addBufferConsumer(bufferConsumer, subpartitionIndex);
- if (success) {
- notifyPipelinedConsumers();
- }
+ public void broadcastRecord(ByteBuffer record) throws IOException {
+ partitionWriter.broadcastRecord(record);
+
+ notifyPipelinedConsumers();
+ }
+
+ @Override
+ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+ partitionWriter.broadcastEvent(event, isPriorityEvent);
+
+ notifyPipelinedConsumers();
+ }
- return success;
+ @Override
+ public void setMetricGroup(TaskIOMetricGroup metrics) {
+ partitionWriter.setMetricGroup(metrics);
+ }
+
+ @Override
+ public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
+ return partitionWriter.createSubpartitionView(index, availabilityListener);
}
@Override
@@ -131,6 +138,21 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
}
@Override
+ public boolean isFinished() {
+ return partitionWriter.isFinished();
+ }
+
+ @Override
+ public void release(Throwable cause) {
+ partitionWriter.release(cause);
+ }
+
+ @Override
+ public boolean isReleased() {
+ return partitionWriter.isReleased();
+ }
+
+ @Override
public void fail(Throwable throwable) {
partitionWriter.fail(throwable);
}
@@ -152,7 +174,7 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
* this will trigger the deployment of consuming tasks after the first buffer has been added.
*/
private void notifyPipelinedConsumers() {
- if (!hasNotifiedPipelinedConsumers) {
+ if (!hasNotifiedPipelinedConsumers && !partitionWriter.isReleased()) {
partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionWriter.getPartitionId(), taskActions);
hasNotifiedPipelinedConsumers = true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index e35b311..9c748db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.io.network.api.serialization;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -116,17 +118,16 @@ public class SpanningRecordSerializationTest extends TestLogger {
// -----------------------------------------------------------------------------------------------------------------
private void testSerializationRoundTrip(Iterable<SerializationTestType> records, int segmentSize) throws Exception {
- RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
RecordDeserializer<SerializationTestType> deserializer =
new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{ tempFolder.getRoot().getAbsolutePath() });
- testSerializationRoundTrip(records, segmentSize, serializer, deserializer);
+ testSerializationRoundTrip(records, segmentSize, deserializer);
}
/**
- * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link RecordDeserializer}
- * interact as expected.
+ * Iterates over the provided records and tests whether {@link RecordWriter#serializeRecord} and
+ * {@link RecordDeserializer} interact as expected.
*
* <p>Only a single {@link MemorySegment} will be allocated.
*
@@ -136,14 +137,14 @@ public class SpanningRecordSerializationTest extends TestLogger {
private static void testSerializationRoundTrip(
Iterable<SerializationTestType> records,
int segmentSize,
- RecordSerializer<SerializationTestType> serializer,
RecordDeserializer<SerializationTestType> deserializer)
throws Exception {
+ final DataOutputSerializer serializer = new DataOutputSerializer(128);
final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
// -------------------------------------------------------------------------------------------------------------
- BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize);
+ BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer.wrapAsByteBuffer(), segmentSize);
int numRecords = 0;
for (SerializationTestType record : records) {
@@ -153,18 +154,21 @@ public class SpanningRecordSerializationTest extends TestLogger {
numRecords++;
// serialize record
- serializer.serializeRecord(record);
- if (serializer.copyToBufferBuilder(serializationResult.getBufferBuilder()).isFullBuffer()) {
+ serializer.clear();
+ ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, record);
+ serializationResult.getBufferBuilder().appendAndCommit(serializedRecord);
+ if (serializationResult.getBufferBuilder().isFull()) {
// buffer is full => start deserializing
deserializer.setNextBuffer(serializationResult.buildBuffer());
numRecords -= DeserializationUtils.deserializeRecords(serializedRecords, deserializer);
// move buffers as long as necessary (for long records)
- while ((serializationResult = setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
+ while ((serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) {
deserializer.setNextBuffer(serializationResult.buildBuffer());
}
}
+ Assert.assertFalse(serializedRecord.hasRemaining());
}
// deserialize left over records
@@ -183,18 +187,16 @@ public class SpanningRecordSerializationTest extends TestLogger {
// assert that all records have been serialized and deserialized
Assert.assertEquals(0, numRecords);
- Assert.assertFalse(serializer.hasSerializedData());
Assert.assertFalse(deserializer.hasUnfinishedData());
}
@Test
public void testSmallRecordUnconsumedBuffer() throws Exception {
- RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
RecordDeserializer<SerializationTestType> deserializer =
new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{tempFolder.getRoot().getAbsolutePath()});
- testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1024);
+ testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1024);
}
/**
@@ -202,33 +204,29 @@ public class SpanningRecordSerializationTest extends TestLogger {
*/
@Test
public void testSpanningRecordUnconsumedBuffer() throws Exception {
- RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
RecordDeserializer<SerializationTestType> deserializer =
new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{tempFolder.getRoot().getAbsolutePath()});
- testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1);
+ testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1);
}
@Test
public void testLargeSpanningRecordUnconsumedBuffer() throws Exception {
- RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
RecordDeserializer<SerializationTestType> deserializer =
new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{tempFolder.getRoot().getAbsolutePath()});
- testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1);
+ testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1);
}
@Test
public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Exception {
- RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
RecordDeserializer<SerializationTestType> deserializer =
new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{tempFolder.getRoot().getAbsolutePath()});
testUnconsumedBuffer(
- serializer,
deserializer,
Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY),
1,
@@ -237,7 +235,6 @@ public class SpanningRecordSerializationTest extends TestLogger {
deserializer.clear();
testUnconsumedBuffer(
- serializer,
deserializer,
Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY),
1,
@@ -245,17 +242,18 @@ public class SpanningRecordSerializationTest extends TestLogger {
}
public void testUnconsumedBuffer(
- RecordSerializer<SerializationTestType> serializer,
RecordDeserializer<SerializationTestType> deserializer,
SerializationTestType record,
int segmentSize,
byte... leftOverBytes) throws Exception {
try (ByteArrayOutputStream unconsumedBytes = new ByteArrayOutputStream()) {
- serializer.serializeRecord(record);
+ DataOutputSerializer serializer = new DataOutputSerializer(128);
+ ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, record);
- BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize);
+ BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize);
- if (serializer.copyToBufferBuilder(serializationResult.getBufferBuilder()).isFullBuffer()) {
+ serializationResult.getBufferBuilder().appendAndCommit(serializedRecord);
+ if (serializationResult.getBufferBuilder().isFull()) {
// buffer is full => start deserializing
Buffer buffer = serializationResult.buildBuffer();
writeBuffer(buffer.readOnlySlice().getNioBufferReadable(), unconsumedBytes);
@@ -265,7 +263,7 @@ public class SpanningRecordSerializationTest extends TestLogger {
deserializer.getNextRecord(record.getClass().newInstance());
// move buffers as long as necessary (for long records)
- while ((serializationResult = setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
+ while ((serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) {
buffer = serializationResult.buildBuffer();
if (serializationResult.isFullRecord()) {
@@ -310,7 +308,7 @@ public class SpanningRecordSerializationTest extends TestLogger {
}
private static BufferAndSerializerResult setNextBufferForSerializer(
- RecordSerializer<SerializationTestType> serializer,
+ ByteBuffer serializedRecord,
int segmentSize) throws IOException {
// create a bufferBuilder with some random starting offset to properly test handling buffer slices in the
// deserialization code.
@@ -319,24 +317,29 @@ public class SpanningRecordSerializationTest extends TestLogger {
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
bufferConsumer.build().recycleBuffer();
+ bufferBuilder.appendAndCommit(serializedRecord);
return new BufferAndSerializerResult(
bufferBuilder,
bufferConsumer,
- serializer.copyToBufferBuilder(bufferBuilder));
+ bufferBuilder.isFull(),
+ !serializedRecord.hasRemaining());
}
private static class BufferAndSerializerResult {
private final BufferBuilder bufferBuilder;
private final BufferConsumer bufferConsumer;
- private final RecordSerializer.SerializationResult serializationResult;
+ private final boolean isFullBuffer;
+ private final boolean isFullRecord;
public BufferAndSerializerResult(
BufferBuilder bufferBuilder,
BufferConsumer bufferConsumer,
- RecordSerializer.SerializationResult serializationResult) {
+ boolean isFullBuffer,
+ boolean isFullRecord) {
this.bufferBuilder = bufferBuilder;
this.bufferConsumer = bufferConsumer;
- this.serializationResult = serializationResult;
+ this.isFullBuffer = isFullBuffer;
+ this.isFullRecord = isFullRecord;
}
public BufferBuilder getBufferBuilder() {
@@ -348,11 +351,11 @@ public class SpanningRecordSerializationTest extends TestLogger {
}
public boolean isFullBuffer() {
- return serializationResult.isFullBuffer();
+ return isFullBuffer;
}
public boolean isFullRecord() {
- return serializationResult.isFullRecord();
+ return isFullRecord;
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
deleted file mode 100644
index e5f5dfc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.testutils.serialization.types.SerializationTestType;
-import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.testutils.serialization.types.Util;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Random;
-
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-
-/**
- * Tests for the {@link SpanningRecordSerializer}.
- */
-public class SpanningRecordSerializerTest {
-
- @Test
- public void testHasSerializedData() throws IOException {
- final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
- final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
-
- Assert.assertFalse(serializer.hasSerializedData());
-
- serializer.serializeRecord(randomIntRecord);
- Assert.assertTrue(serializer.hasSerializedData());
-
- final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
- serializer.copyToBufferBuilder(bufferBuilder1);
- Assert.assertFalse(serializer.hasSerializedData());
-
- final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
- serializer.reset();
- serializer.copyToBufferBuilder(bufferBuilder2);
- Assert.assertFalse(serializer.hasSerializedData());
-
- serializer.reset();
- serializer.copyToBufferBuilder(bufferBuilder2);
- // Buffer builder full!
- Assert.assertTrue(serializer.hasSerializedData());
- }
-
- @Test
- public void testEmptyRecords() throws IOException {
- final int segmentSize = 11;
-
- final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
- final BufferBuilder bufferBuilder1 = createBufferBuilder(segmentSize);
-
- Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD,
- serializer.copyToBufferBuilder(bufferBuilder1));
-
- SerializationTestType emptyRecord = new SerializationTestType() {
- @Override
- public SerializationTestType getRandom(Random rnd) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int length() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void write(DataOutputView out) {}
-
- @Override
- public void read(DataInputView in) {}
-
- @Override
- public int hashCode() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean equals(Object obj) {
- throw new UnsupportedOperationException();
- }
- };
-
- serializer.serializeRecord(emptyRecord);
- Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1));
-
- serializer.reset();
- Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1));
-
- serializer.reset();
- Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
- serializer.copyToBufferBuilder(bufferBuilder1));
-
- final BufferBuilder bufferBuilder2 = createBufferBuilder(segmentSize);
- Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD,
- serializer.copyToBufferBuilder(bufferBuilder2));
- }
-
- @Test
- public void testIntRecordsSpanningMultipleSegments() throws Exception {
- final int segmentSize = 1;
- final int numValues = 10;
-
- test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
- }
-
- @Test
- public void testIntRecordsWithAlignedSegments() throws Exception {
- final int segmentSize = 64;
- final int numValues = 64;
-
- test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
- }
-
- @Test
- public void testIntRecordsWithUnalignedSegments() throws Exception {
- final int segmentSize = 31;
- final int numValues = 248; // least common multiple => last record should align
-
- test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
- }
-
- @Test
- public void testRandomRecords() throws Exception {
- final int segmentSize = 127;
- final int numValues = 100000;
-
- test(Util.randomRecords(numValues), segmentSize);
- }
-
- // -----------------------------------------------------------------------------------------------------------------
-
- /**
- * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected
- * {@link RecordSerializer.SerializationResult} values.
- *
- * <p>Only a single {@link MemorySegment} will be allocated.
- *
- * @param records records to test
- * @param segmentSize size for the {@link MemorySegment}
- */
- private void test(Util.MockRecords records, int segmentSize) throws Exception {
- final int serializationOverhead = 4; // length encoding
-
- final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
-
- // -------------------------------------------------------------------------------------------------------------
-
- BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
- int numBytes = 0;
- for (SerializationTestType record : records) {
- serializer.serializeRecord(record);
- RecordSerializer.SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
- numBytes += record.length() + serializationOverhead;
-
- if (numBytes < segmentSize) {
- Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
- } else if (numBytes == segmentSize) {
- Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result);
- bufferBuilder = createBufferBuilder(segmentSize);
- numBytes = 0;
- } else {
- Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
-
- while (result.isFullBuffer()) {
- numBytes -= segmentSize;
- bufferBuilder = createBufferBuilder(segmentSize);
- result = serializer.copyToBufferBuilder(bufferBuilder);
- }
-
- Assert.assertTrue(result.isFullRecord());
- }
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 84aa17e..66eb8f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -18,79 +18,45 @@
package org.apache.flink.runtime.io.network.api.writer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
-import java.util.ArrayDeque;
+import java.nio.ByteBuffer;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* {@link ResultPartitionWriter} that collects output on the List.
*/
@ThreadSafe
public abstract class AbstractCollectingResultPartitionWriter extends MockResultPartitionWriter {
- private final BufferProvider bufferProvider;
- private final ArrayDeque<BufferConsumer> bufferConsumers = new ArrayDeque<>();
-
- public AbstractCollectingResultPartitionWriter(BufferProvider bufferProvider) {
- this.bufferProvider = checkNotNull(bufferProvider);
- }
-
- @Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- return bufferProvider.requestBufferBuilderBlocking(targetChannel);
- }
@Override
- public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
- return bufferProvider.requestBufferBuilder(targetChannel);
+ public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+ checkArgument(targetSubpartition < getNumberOfSubpartitions());
+ deserializeRecord(record);
}
@Override
- public synchronized boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
- checkState(targetChannel < getNumberOfSubpartitions());
- bufferConsumers.add(bufferConsumer);
- processBufferConsumers();
- return true;
+ public void broadcastRecord(ByteBuffer record) throws IOException {
+ deserializeRecord(record);
}
- private void processBufferConsumers() throws IOException {
- while (!bufferConsumers.isEmpty()) {
- BufferConsumer bufferConsumer = bufferConsumers.peek();
- Buffer buffer = bufferConsumer.build();
- try {
- deserializeBuffer(buffer);
- if (!bufferConsumer.isFinished()) {
- break;
- }
- bufferConsumers.pop().close();
- }
- finally {
- buffer.recycleBuffer();
- }
- }
- }
+ private void deserializeRecord(ByteBuffer serializedRecord) throws IOException {
+ checkArgument(serializedRecord.hasArray());
- @Override
- public synchronized void flushAll() {
- try {
- processBufferConsumers();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void flush(int subpartitionIndex) {
- flushAll();
+ MemorySegment segment = MemorySegmentFactory.wrap(serializedRecord.array());
+ NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
+ buffer.setSize(serializedRecord.remaining());
+ deserializeBuffer(buffer);
+ buffer.recycleBuffer();
}
protected abstract void deserializeBuffer(Buffer buffer) throws IOException;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
index dccfd1d..d318961 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
@@ -22,8 +22,10 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.testutils.serialization.types.IntType;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
@@ -31,12 +33,12 @@ import org.apache.flink.testutils.serialization.types.Util;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import static org.junit.Assert.assertEquals;
@@ -56,19 +58,12 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
*/
@Test
public void testBroadcastMixedRandomEmitRecord() throws Exception {
- final int numberOfChannels = 4;
+ final int numberOfChannels = 8;
final int numberOfRecords = 8;
final int bufferSize = 32;
- @SuppressWarnings("unchecked")
- final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
- for (int i = 0; i < numberOfChannels; i++) {
- queues[i] = new ArrayDeque<>();
- }
-
- final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
- final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
- final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test");
+ final ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+ final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partition, -1, "test");
final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{ tempFolder.getRoot().getAbsolutePath() });
@@ -84,7 +79,7 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
int index = 0;
for (SerializationTestType record : records) {
int randomChannel = index++ % numberOfChannels;
- writer.randomEmit(record, randomChannel);
+ writer.emit(record, randomChannel);
serializedRecords.get(randomChannel).add(record);
writer.broadcastEmit(record);
@@ -93,23 +88,23 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
}
}
- final int numberOfCreatedBuffers = bufferProvider.getNumberOfCreatedBuffers();
+ final int numberOfCreatedBuffers = partition.getBufferPool().bestEffortGetNumOfUsedBuffers();
// verify the expected number of requested buffers, and it would always request a new buffer while random emitting
- assertEquals(numberOfRecords, numberOfCreatedBuffers);
+ assertEquals(2 * numberOfRecords, numberOfCreatedBuffers);
for (int i = 0; i < numberOfChannels; i++) {
// every channel would queue the number of above crated buffers
- assertEquals(numberOfRecords, queues[i].size());
+ assertEquals(numberOfRecords + 1, partition.getNumberOfQueuedBuffers(i));
final int excessRandomRecords = i < numberOfRecords % numberOfChannels ? 1 : 0;
final int numberOfRandomRecords = numberOfRecords / numberOfChannels + excessRandomRecords;
final int numberOfTotalRecords = numberOfRecords + numberOfRandomRecords;
// verify the data correctness in every channel queue
verifyDeserializationResults(
- queues[i],
+ partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()),
deserializer,
serializedRecords.get(i),
- numberOfCreatedBuffers,
+ numberOfRecords + 1,
numberOfTotalRecords);
}
}
@@ -121,45 +116,41 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
@Test
public void testRandomEmitAndBufferRecycling() throws Exception {
int recordSize = 8;
+ int numberOfChannels = 2;
- final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(2, 2 * recordSize);
- final KeepingPartitionWriter partitionWriter = new KeepingPartitionWriter(bufferProvider) {
- @Override
- public int getNumberOfSubpartitions() {
- return 2;
- }
- };
- final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test");
+ ResultPartition partition = createResultPartition(2 * recordSize, numberOfChannels);
+ BufferPool bufferPool = partition.getBufferPool();
+ BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partition, -1, "test");
// force materialization of both buffers for easier availability tests
- List<Buffer> buffers = Arrays.asList(bufferProvider.requestBuffer(), bufferProvider.requestBuffer());
+ List<Buffer> buffers = Arrays.asList(bufferPool.requestBuffer(), bufferPool.requestBuffer());
buffers.forEach(Buffer::recycleBuffer);
- assertEquals(2, bufferProvider.getNumberOfAvailableBuffers());
+ assertEquals(2, bufferPool.getNumberOfAvailableMemorySegments());
// fill first buffer
- writer.randomEmit(new IntType(1), 0);
+ writer.broadcastEmit(new IntType(1));
writer.broadcastEmit(new IntType(2));
- assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
+ assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
// simulate consumption of first buffer consumer; this should not free buffers
- assertEquals(1, partitionWriter.getAddedBufferConsumers(0).size());
- closeConsumer(partitionWriter, 0, 2 * recordSize);
- assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
+ assertEquals(1, partition.getNumberOfQueuedBuffers(0));
+ ResultSubpartitionView view0 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+ closeConsumer(view0, 2 * recordSize);
+ assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
// use second buffer
- writer.broadcastEmit(new IntType(3));
- assertEquals(0, bufferProvider.getNumberOfAvailableBuffers());
+ writer.emit(new IntType(3), 0);
+ assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
// fully free first buffer
- assertEquals(2, partitionWriter.getAddedBufferConsumers(1).size());
- closeConsumer(partitionWriter, 1, recordSize);
- assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
+ assertEquals(1, partition.getNumberOfQueuedBuffers(1));
+ ResultSubpartitionView view1 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
+ closeConsumer(view1, 2 * recordSize);
+ assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
}
- public void closeConsumer(KeepingPartitionWriter partitionWriter, int subpartitionIndex, int expectedSize) {
- BufferConsumer bufferConsumer = partitionWriter.getAddedBufferConsumers(subpartitionIndex).get(0);
- Buffer buffer = bufferConsumer.build();
- bufferConsumer.close();
+ public void closeConsumer(ResultSubpartitionView view, int expectedSize) throws IOException {
+ Buffer buffer = view.getNextBuffer().buffer();
assertEquals(expectedSize, buffer.getSize());
buffer.recycleBuffer();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
index 1285c4e..4a9f7ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.types.Record;
import java.io.IOException;
@@ -39,8 +38,7 @@ public class RecordCollectingResultPartitionWriter extends AbstractCollectingRes
private final RecordDeserializer<Record> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{System.getProperty("java.io.tmpdir")});
- public RecordCollectingResultPartitionWriter(List<Record> output, BufferProvider bufferProvider) {
- super(bufferProvider);
+ public RecordCollectingResultPartitionWriter(List<Record> output) {
this.output = checkNotNull(output);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
index 3d3073b..5540a53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
@@ -20,11 +20,9 @@ package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -44,35 +42,32 @@ public class RecordOrEventCollectingResultPartitionWriter<T> extends AbstractCol
public RecordOrEventCollectingResultPartitionWriter(
Collection<Object> output,
- BufferProvider bufferProvider,
TypeSerializer<T> serializer) {
- super(bufferProvider);
this.output = checkNotNull(output);
this.delegate = new NonReusingDeserializationDelegate<>(checkNotNull(serializer));
}
@Override
+ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+ output.add(event);
+ }
+
+ @Override
protected void deserializeBuffer(Buffer buffer) throws IOException {
- if (buffer.isBuffer()) {
- deserializer.setNextBuffer(buffer);
+ deserializer.setNextBuffer(buffer);
- while (deserializer.hasUnfinishedData()) {
- RecordDeserializer.DeserializationResult result =
- deserializer.getNextRecord(delegate);
+ while (deserializer.hasUnfinishedData()) {
+ RecordDeserializer.DeserializationResult result =
+ deserializer.getNextRecord(delegate);
- if (result.isFullRecord()) {
- output.add(delegate.getInstance());
- }
+ if (result.isFullRecord()) {
+ output.add(delegate.getInstance());
+ }
- if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
- || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
- break;
- }
+ if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
+ || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+ break;
}
- } else {
- // is event
- AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
- output.add(event);
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index adb059c..5a0ea8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -20,28 +20,25 @@ package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -51,16 +48,17 @@ import static org.junit.Assert.assertTrue;
*/
public class RecordWriterDelegateTest extends TestLogger {
+ private static final int recordSize = 8;
+
private static final int numberOfBuffers = 10;
private static final int memorySegmentSize = 128;
- private static final int numberOfSegmentsToRequest = 2;
-
private NetworkBufferPool globalPool;
@Before
public void setup() {
+ assertEquals("Illegal memory segment size,", 0, memorySegmentSize % recordSize);
globalPool = new NetworkBufferPool(numberOfBuffers, memorySegmentSize);
}
@@ -103,11 +101,11 @@ public class RecordWriterDelegateTest extends TestLogger {
@SuppressWarnings("unchecked")
public void testSingleRecordWriterBroadcastEvent() throws Exception {
// setup
- final ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };
- final RecordWriter recordWriter = createRecordWriter(queues);
+ final ResultPartition partition = RecordWriterTest.createResultPartition(memorySegmentSize, 2);
+ final RecordWriter recordWriter = new RecordWriterBuilder<>().build(partition);
final RecordWriterDelegate writerDelegate = new SingleRecordWriter(recordWriter);
- verifyBroadcastEvent(writerDelegate, queues, 1);
+ verifyBroadcastEvent(writerDelegate, Collections.singletonList(partition));
}
@Test
@@ -116,14 +114,16 @@ public class RecordWriterDelegateTest extends TestLogger {
// setup
final int numRecordWriters = 2;
final List<RecordWriter> recordWriters = new ArrayList<>(numRecordWriters);
- final ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };
+ final List<ResultPartition> partitions = new ArrayList<>(numRecordWriters);
for (int i = 0; i < numRecordWriters; i++) {
- recordWriters.add(createRecordWriter(queues));
+ final ResultPartition partition = RecordWriterTest.createResultPartition(memorySegmentSize, 2);
+ partitions.add(partition);
+ recordWriters.add(new RecordWriterBuilder<>().build(partition));
}
final RecordWriterDelegate writerDelegate = new MultipleRecordWriters(recordWriters);
- verifyBroadcastEvent(writerDelegate, queues, numRecordWriters);
+ verifyBroadcastEvent(writerDelegate, partitions);
}
private RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception {
@@ -136,14 +136,6 @@ public class RecordWriterDelegateTest extends TestLogger {
return new RecordWriterBuilder().build(partition);
}
- private RecordWriter createRecordWriter(ArrayDeque<BufferConsumer>[] queues) {
- final ResultPartitionWriter partition = new RecordWriterTest.CollectingPartitionWriter(
- queues,
- new TestPooledBufferProvider(1));
-
- return new RecordWriterBuilder().build(partition);
- }
-
private void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exception {
// writer is available at the beginning
assertTrue(writerDelegate.isAvailable());
@@ -151,13 +143,14 @@ public class RecordWriterDelegateTest extends TestLogger {
// request one buffer from the local pool to make it unavailable
RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
- final BufferBuilder bufferBuilder = checkNotNull(recordWriter.getBufferBuilder(0));
+ for (int i = 0; i < memorySegmentSize / recordSize; ++i) {
+ recordWriter.emit(new IntValue(i));
+ }
assertFalse(writerDelegate.isAvailable());
CompletableFuture future = writerDelegate.getAvailableFuture();
assertFalse(future.isDone());
// recycle the buffer to make the local pool available again
- BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 1).finish();
ResultSubpartitionView readView = recordWriter.getTargetPartition().createSubpartitionView(0, new NoOpBufferAvailablityListener());
Buffer buffer = readView.getNextBuffer().buffer();
@@ -169,18 +162,18 @@ public class RecordWriterDelegateTest extends TestLogger {
private void verifyBroadcastEvent(
RecordWriterDelegate writerDelegate,
- ArrayDeque<BufferConsumer>[] queues,
- int numRecordWriters) throws Exception {
+ List<ResultPartition> partitions) throws Exception {
final CancelCheckpointMarker message = new CancelCheckpointMarker(1);
writerDelegate.broadcastEvent(message);
// verify the added messages in all the queues
- for (int i = 0; i < queues.length; i++) {
- assertEquals(numRecordWriters, queues[i].size());
+ for (ResultPartition partition : partitions) {
+ for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
+ assertEquals(1, partition.getNumberOfQueuedBuffers(i));
- for (int j = 0; j < numRecordWriters; j++) {
- BufferOrEvent boe = RecordWriterTest.parseBuffer(queues[i].remove(), i);
+ ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+ BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
assertTrue(boe.isEvent());
assertEquals(message, boe.getEvent());
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 6cad1d3..a0956ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -27,34 +27,33 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.DeserializationUtils;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
@@ -65,7 +64,6 @@ import org.apache.flink.testutils.serialization.types.Util;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.XORShiftRandom;
-import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -74,19 +72,13 @@ import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Queue;
import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -119,35 +111,27 @@ public class RecordWriterTest {
int numberOfChannels = 4;
int bufferSize = 32;
- @SuppressWarnings("unchecked")
- Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
- for (int i = 0; i < numberOfChannels; i++) {
- queues[i] = new ArrayDeque<>();
- }
-
- TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
- ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
- RecordWriter<ByteArrayIO> writer = createRecordWriter(partitionWriter);
+ ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+ RecordWriter<ByteArrayIO> writer = createRecordWriter(partition);
CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpointWithDefaultLocation());
// No records emitted yet, broadcast should not request a buffer
writer.broadcastEvent(barrier);
- assertEquals(0, bufferProvider.getNumberOfCreatedBuffers());
+ assertEquals(0, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
for (int i = 0; i < numberOfChannels; i++) {
- assertEquals(1, queues[i].size());
- BufferOrEvent boe = parseBuffer(queues[i].remove(), i);
+ assertEquals(1, partition.getNumberOfQueuedBuffers(i));
+ ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+ BufferOrEvent boe = parseBuffer(view.getNextBuffer().buffer(), i);
assertTrue(boe.isEvent());
assertEquals(barrier, boe.getEvent());
- assertEquals(0, queues[i].size());
+ assertFalse(view.isAvailable(Integer.MAX_VALUE));
}
}
/**
- * Tests broadcasting events when records have been emitted. The emitted
- * records cover all three {@link SerializationResult} types.
+ * Tests broadcasting events when records have been emitted.
*/
@Test
public void testBroadcastEventMixedRecords() throws Exception {
@@ -156,16 +140,8 @@ public class RecordWriterTest {
int bufferSize = 32;
int lenBytes = 4; // serialized length
- @SuppressWarnings("unchecked")
- Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
- for (int i = 0; i < numberOfChannels; i++) {
- queues[i] = new ArrayDeque<>();
- }
-
- TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
- ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
- RecordWriter<ByteArrayIO> writer = createRecordWriter(partitionWriter);
+ ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+ RecordWriter<ByteArrayIO> writer = createRecordWriter(partition);
CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpointWithDefaultLocation());
// Emit records on some channels first (requesting buffers), then
@@ -194,34 +170,43 @@ public class RecordWriterTest {
writer.broadcastEvent(barrier);
if (isBroadcastWriter) {
- assertEquals(3, bufferProvider.getNumberOfCreatedBuffers());
+ assertEquals(3, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
for (int i = 0; i < numberOfChannels; i++) {
- assertEquals(4, queues[i].size()); // 3 buffer + 1 event
+ assertEquals(4, partition.getNumberOfQueuedBuffers(i)); // 3 buffer + 1 event
+ ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
for (int j = 0; j < 3; j++) {
- assertTrue(parseBuffer(queues[i].remove(), 0).isBuffer());
+ assertTrue(parseBuffer(view.getNextBuffer().buffer(), 0).isBuffer());
}
- BufferOrEvent boe = parseBuffer(queues[i].remove(), i);
+ BufferOrEvent boe = parseBuffer(view.getNextBuffer().buffer(), i);
assertTrue(boe.isEvent());
assertEquals(barrier, boe.getEvent());
}
} else {
- assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
+ assertEquals(4, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
+ ResultSubpartitionView[] views = new ResultSubpartitionView[4];
+
+ assertEquals(2, partition.getNumberOfQueuedBuffers(0)); // 1 buffer + 1 event
+ views[0] = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+ assertTrue(parseBuffer(views[0].getNextBuffer().buffer(), 0).isBuffer());
- assertEquals(2, queues[0].size()); // 1 buffer + 1 event
- assertTrue(parseBuffer(queues[0].remove(), 0).isBuffer());
- assertEquals(3, queues[1].size()); // 2 buffers + 1 event
- assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
- assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
- assertEquals(2, queues[2].size()); // 1 buffer + 1 event
- assertTrue(parseBuffer(queues[2].remove(), 2).isBuffer());
- assertEquals(1, queues[3].size()); // 0 buffers + 1 event
+ assertEquals(3, partition.getNumberOfQueuedBuffers(1)); // 2 buffers + 1 event
+ views[1] = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
+ assertTrue(parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer());
+ assertTrue(parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer());
+
+ assertEquals(2, partition.getNumberOfQueuedBuffers(2)); // 1 buffer + 1 event
+ views[2] = partition.createSubpartitionView(2, new NoOpBufferAvailablityListener());
+ assertTrue(parseBuffer(views[2].getNextBuffer().buffer(), 2).isBuffer());
+
+ views[3] = partition.createSubpartitionView(3, new NoOpBufferAvailablityListener());
+ assertEquals(1, partition.getNumberOfQueuedBuffers(3)); // 0 buffers + 1 event
// every queue's last element should be the event
for (int i = 0; i < numberOfChannels; i++) {
- BufferOrEvent boe = parseBuffer(queues[i].remove(), i);
+ BufferOrEvent boe = parseBuffer(views[i].getNextBuffer().buffer(), i);
assertTrue(boe.isEvent());
assertEquals(barrier, boe.getEvent());
}
@@ -234,31 +219,28 @@ public class RecordWriterTest {
*/
@Test
public void testBroadcastEventBufferReferenceCounting() throws Exception {
+ int bufferSize = 32 * 1024;
+ int numSubpartitions = 2;
- @SuppressWarnings("unchecked")
- ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };
-
- ResultPartitionWriter partition =
- new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
+ ResultPartition partition = createResultPartition(bufferSize, numSubpartitions);
RecordWriter<?> writer = createRecordWriter(partition);
writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
- // Verify added to all queues
- assertEquals(1, queues[0].size());
- assertEquals(1, queues[1].size());
-
// get references to buffer consumers (copies from the original event buffer consumer)
- BufferConsumer bufferConsumer1 = queues[0].getFirst();
- BufferConsumer bufferConsumer2 = queues[1].getFirst();
+ Buffer[] buffers = new Buffer[numSubpartitions];
// process all collected events (recycles the buffer)
- for (int i = 0; i < queues.length; i++) {
- assertTrue(parseBuffer(queues[i].remove(), i).isEvent());
+ for (int i = 0; i < numSubpartitions; i++) {
+ assertEquals(1, partition.getNumberOfQueuedBuffers(i));
+ ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+ buffers[i] = view.getNextBuffer().buffer();
+ assertTrue(parseBuffer(buffers[i], i).isEvent());
}
- assertTrue(bufferConsumer1.isRecycled());
- assertTrue(bufferConsumer2.isRecycled());
+ for (int i = 0; i < numSubpartitions; ++i) {
+ assertTrue(buffers[i].isRecycled());
+ }
}
/**
@@ -289,15 +271,8 @@ public class RecordWriterTest {
final int numValues = 8;
final int serializationLength = 4;
- @SuppressWarnings("unchecked")
- final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
- for (int i = 0; i < numberOfChannels; i++) {
- queues[i] = new ArrayDeque<>();
- }
-
- final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
- final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
- final RecordWriter<SerializationTestType> writer = createRecordWriter(partitionWriter);
+ final ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+ final RecordWriter<SerializationTestType> writer = createRecordWriter(partition);
final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
new String[]{ tempFolder.getRoot().getAbsolutePath() });
@@ -310,14 +285,15 @@ public class RecordWriterTest {
final int numRequiredBuffers = numValues / (bufferSize / (4 + serializationLength));
if (isBroadcastWriter) {
- assertEquals(numRequiredBuffers, bufferProvider.getNumberOfCreatedBuffers());
+ assertEquals(numRequiredBuffers, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
} else {
- assertEquals(numRequiredBuffers * numberOfChannels, bufferProvider.getNumberOfCreatedBuffers());
+ assertEquals(numRequiredBuffers * numberOfChannels, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
}
for (int i = 0; i < numberOfChannels; i++) {
- assertEquals(numRequiredBuffers, queues[i].size());
- verifyDeserializationResults(queues[i], deserializer, serializedRecords.clone(), numRequiredBuffers, numValues);
+ assertEquals(numRequiredBuffers, partition.getNumberOfQueuedBuffers(i));
+ ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+ verifyDeserializationResults(view, deserializer, serializedRecords.clone(), numRequiredBuffers, numValues);
}
}
@@ -345,7 +321,7 @@ public class RecordWriterTest {
assertTrue(recordWriter.getAvailableFuture().isDone());
// request one buffer from the local pool to make it unavailable afterwards
- final BufferBuilder bufferBuilder = resultPartition.getBufferBuilder(0);
+ final BufferBuilder bufferBuilder = localPool.requestBufferBuilder(0);
assertNotNull(bufferBuilder);
assertFalse(recordWriter.getAvailableFuture().isDone());
@@ -418,64 +394,8 @@ public class RecordWriterTest {
}
}
- @Test
- public void testIdleTime() throws IOException, InterruptedException {
- // setup
- final NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
- final BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE);
- final ResultPartitionWriter resultPartition = new ResultPartitionBuilder()
- .setBufferPoolFactory(p -> localPool)
- .build();
- resultPartition.setup();
- final ResultPartitionWriter partitionWrapper = new ConsumableNotifyingResultPartitionWriterDecorator(
- new NoOpTaskActions(),
- new JobID(),
- resultPartition,
- new NoOpResultPartitionConsumableNotifier());
- final RecordWriter recordWriter = createRecordWriter(partitionWrapper);
- BufferBuilder builder = recordWriter.requestNewBufferBuilder(0);
- BufferBuilderTestUtils.fillBufferBuilder(builder, 1).finish();
- ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
- Buffer buffer = readView.getNextBuffer().buffer();
-
- // idle time is zero when there is buffer available.
- assertEquals(0, recordWriter.getIdleTimeMsPerSecond().getCount());
-
- CountDownLatch syncLock = new CountDownLatch(1);
- AtomicReference<BufferBuilder> asyncRequestResult = new AtomicReference<>();
- final Thread requestThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- // notify that the request thread start to run.
- syncLock.countDown();
- // wait for buffer.
- asyncRequestResult.set(recordWriter.requestNewBufferBuilder(0));
- } catch (Exception e) {
- }
- }
- });
- requestThread.start();
-
- // wait until request thread start to run.
- syncLock.await();
-
- Thread.sleep(10);
-
- //recycle the buffer
- buffer.recycleBuffer();
- requestThread.join();
-
- assertThat(recordWriter.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L));
- assertNotNull(asyncRequestResult.get());
- }
-
private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) throws Exception {
- @SuppressWarnings("unchecked")
- ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
-
- ResultPartitionWriter partition =
- new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
+ ResultPartition partition = createResultPartition(4096, 2);
RecordWriter<IntValue> writer = createRecordWriter(partition);
if (broadcastEvent) {
@@ -485,12 +405,15 @@ public class RecordWriterTest {
}
// verify added to all queues
- assertEquals(1, queues[0].size());
- assertEquals(1, queues[1].size());
+ assertEquals(1, partition.getNumberOfQueuedBuffers(0));
+ assertEquals(1, partition.getNumberOfQueuedBuffers(1));
+
+ ResultSubpartitionView view0 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+ ResultSubpartitionView view1 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
// these two buffers may share the memory but not the indices!
- Buffer buffer1 = buildSingleBuffer(queues[0].remove());
- Buffer buffer2 = buildSingleBuffer(queues[1].remove());
+ Buffer buffer1 = view0.getNextBuffer().buffer();
+ Buffer buffer2 = view1.getNextBuffer().buffer();
assertEquals(0, buffer1.getReaderIndex());
assertEquals(0, buffer2.getReaderIndex());
buffer1.setReaderIndex(1);
@@ -498,18 +421,19 @@ public class RecordWriterTest {
}
protected void verifyDeserializationResults(
- Queue<BufferConsumer> queue,
+ ResultSubpartitionView view,
RecordDeserializer<SerializationTestType> deserializer,
ArrayDeque<SerializationTestType> expectedRecords,
int numRequiredBuffers,
int numValues) throws Exception {
int assertRecords = 0;
for (int j = 0; j < numRequiredBuffers; j++) {
- Buffer buffer = buildSingleBuffer(queue.remove());
+ Buffer buffer = view.getNextBuffer().buffer();
deserializer.setNextBuffer(buffer);
assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
}
+ assertFalse(view.isAvailable(Integer.MAX_VALUE));
Assert.assertEquals(numValues, assertRecords);
}
@@ -526,51 +450,18 @@ public class RecordWriterTest {
}
}
+ public static ResultPartition createResultPartition(int bufferSize, int numSubpartitions) throws IOException {
+ NettyShuffleEnvironment env = new NettyShuffleEnvironmentBuilder().setBufferSize(bufferSize).build();
+ ResultPartition partition = createPartition(env, ResultPartitionType.PIPELINED, numSubpartitions);
+ partition.setup();
+ return partition;
+ }
+
// ---------------------------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------------------------
- /**
- * Partition writer that collects the added buffers/events in multiple queue.
- */
- static class CollectingPartitionWriter extends MockResultPartitionWriter {
- private final Queue<BufferConsumer>[] queues;
- private final BufferProvider bufferProvider;
-
- /**
- * Create the partition writer.
- *
- * @param queues one queue per outgoing channel
- * @param bufferProvider buffer provider
- */
- CollectingPartitionWriter(Queue<BufferConsumer>[] queues, BufferProvider bufferProvider) {
- this.queues = queues;
- this.bufferProvider = bufferProvider;
- }
-
- @Override
- public int getNumberOfSubpartitions() {
- return queues.length;
- }
-
- @Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- return bufferProvider.requestBufferBuilderBlocking(targetChannel);
- }
-
- @Override
- public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
- return bufferProvider.requestBufferBuilder(targetChannel);
- }
-
- @Override
- public boolean addBufferConsumer(BufferConsumer buffer, int targetChannel) {
- return queues[targetChannel].add(buffer);
- }
- }
-
- static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
- Buffer buffer = buildSingleBuffer(bufferConsumer);
+ static BufferOrEvent parseBuffer(Buffer buffer, int targetChannel) throws IOException {
if (buffer.isBuffer()) {
return new BufferOrEvent(buffer, new InputChannelInfo(0, targetChannel));
} else {
@@ -581,68 +472,6 @@ public class RecordWriterTest {
}
}
- /**
- * Partition writer that recycles all received buffers and does no further processing.
- */
- private static class RecyclingPartitionWriter extends MockResultPartitionWriter {
- private final BufferProvider bufferProvider;
-
- private RecyclingPartitionWriter(BufferProvider bufferProvider) {
- this.bufferProvider = bufferProvider;
- }
-
- @Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- return bufferProvider.requestBufferBuilderBlocking(targetChannel);
- }
-
- @Override
- public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
- return bufferProvider.requestBufferBuilder(targetChannel);
- }
- }
-
- static class KeepingPartitionWriter extends MockResultPartitionWriter {
- private final BufferProvider bufferProvider;
- private Map<Integer, List<BufferConsumer>> produced = new HashMap<>();
-
- KeepingPartitionWriter(BufferProvider bufferProvider) {
- this.bufferProvider = bufferProvider;
- }
-
- @Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- return bufferProvider.requestBufferBuilderBlocking(targetChannel);
- }
-
- @Override
- public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
- return bufferProvider.requestBufferBuilder(targetChannel);
- }
-
- @Override
- public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) {
- // keep the buffer occupied.
- produced.putIfAbsent(targetChannel, new ArrayList<>());
- produced.get(targetChannel).add(bufferConsumer);
- return true;
- }
-
- public List<BufferConsumer> getAddedBufferConsumers(int subpartitionIndex) {
- return produced.get(subpartitionIndex);
- }
-
- @Override
- public void close() {
- for (List<BufferConsumer> bufferConsumers : produced.values()) {
- for (BufferConsumer bufferConsumer : bufferConsumers) {
- bufferConsumer.close();
- }
- }
- produced.clear();
- }
- }
-
private static class ByteArrayIO implements IOReadableWritable {
private final byte[] bytes;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 3b6a2f5..f9616bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -21,22 +21,21 @@ package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
-import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -55,9 +54,11 @@ import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
@@ -478,14 +479,12 @@ public class PartitionRequestQueueTest {
}
private static ResultPartition createFinishedPartitionWithFilledData(ResultPartitionManager partitionManager) throws Exception {
- final ResultPartition partition = new ResultPartitionBuilder()
- .setResultPartitionType(ResultPartitionType.BLOCKING)
- .setFileChannelManager(fileChannelManager)
- .setResultPartitionManager(partitionManager)
- .build();
-
- partitionManager.registerResultPartition(partition);
- PartitionTestUtils.writeBuffers(partition, 1, BUFFER_SIZE);
+ NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().setResultPartitionManager(partitionManager).build();
+ ResultPartition partition = createPartition(environment, fileChannelManager, ResultPartitionType.BLOCKING, 1);
+
+ partition.setup();
+ partition.emitRecord(ByteBuffer.allocate(BUFFER_SIZE), 0);
+ partition.finish();
return partition;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index ad21bf4..220225e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -18,13 +18,14 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
/**
@@ -54,14 +55,15 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
}
@Override
- public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
- bufferConsumer.close();
- return true;
+ public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
}
@Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
+ public void broadcastRecord(ByteBuffer record) throws IOException {
+ }
+
+ @Override
+ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
}
@Override
@@ -69,8 +71,8 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
throw new UnsupportedOperationException();
}
- public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
- throw new UnsupportedOperationException();
+ @Override
+ public void setMetricGroup(TaskIOMetricGroup metrics) {
}
@Override
@@ -90,6 +92,20 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
}
@Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public void release(Throwable cause) {
+ }
+
+ @Override
+ public boolean isReleased() {
+ return false;
+ }
+
+ @Override
public CompletableFuture<?> getAvailableFuture() {
return AVAILABLE;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 462cc4b..1ee8dd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,6 +38,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import java.nio.ByteBuffer;
+
/**
* Test for consuming a pipelined result only partially.
*/
@@ -118,10 +119,8 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
final ResultPartitionWriter writer = getEnvironment().getWriter(0);
for (int i = 0; i < 8; i++) {
- final BufferBuilder bufferBuilder = writer.getBufferBuilder(0);
- writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+ writer.emitRecord(ByteBuffer.allocate(1024), 0);
Thread.sleep(50);
- bufferBuilder.finish();
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index a545b73..c6d7819 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -31,7 +30,6 @@ import org.hamcrest.Matchers;
import java.io.IOException;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -131,14 +129,4 @@ public enum PartitionTestUtils {
1,
true);
}
-
- public static void writeBuffers(
- ResultPartitionWriter partition,
- int numberOfBuffers,
- int bufferSize) throws IOException {
- for (int i = 0; i < numberOfBuffers; i++) {
- partition.addBufferConsumer(createFilledFinishedBufferConsumer(bufferSize), 0);
- }
- partition.finish();
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 95aee00..c806a19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -23,12 +23,9 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
@@ -39,7 +36,6 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -47,7 +43,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
-import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -141,41 +136,32 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
// Config
- final int producerBufferPoolSize = 8;
final int producerNumberOfBuffersToProduce = 128;
+ final int bufferSize = 32 * 1024;
// Producer behaviour
final TestProducerSource producerSource = new TestProducerSource() {
- private BufferProvider bufferProvider = new TestPooledBufferProvider(producerBufferPoolSize);
-
private int numberOfBuffers;
@Override
- public BufferConsumerAndChannel getNextBufferConsumer() throws Exception {
+ public BufferAndChannel getNextBuffer() throws Exception {
if (numberOfBuffers == producerNumberOfBuffersToProduce) {
return null;
}
- final BufferBuilder bufferBuilder = bufferProvider.requestBufferBuilderBlocking();
- final BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
- int segmentSize = bufferBuilder.getMaxCapacity();
-
- MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
+ MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
- int next = numberOfBuffers * (segmentSize / Integer.BYTES);
+ int next = numberOfBuffers * (bufferSize / Integer.BYTES);
- for (int i = 0; i < segmentSize; i += 4) {
+ for (int i = 0; i < bufferSize; i += 4) {
segment.putInt(i, next);
next++;
}
- checkState(bufferBuilder.appendAndCommit(ByteBuffer.wrap(segment.getArray())) == segmentSize);
- bufferBuilder.finish();
-
numberOfBuffers++;
- return new BufferConsumerAndChannel(bufferConsumer, 0);
+ return new BufferAndChannel(segment.getArray(), 0);
}
};
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index e293422..d221e99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -26,49 +26,46 @@ import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.concurrent.FutureConsumerWithException;
+import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.verifyCreateSubpartitionViewThrowsException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
/**
* Tests for {@link ResultPartition}.
@@ -79,6 +76,8 @@ public class ResultPartitionTest {
private static FileChannelManager fileChannelManager;
+ private final int bufferSize = 1024;
+
@BeforeClass
public static void setUp() {
fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
@@ -115,32 +114,45 @@ public class ResultPartitionTest {
*/
@Test
public void testSendScheduleOrUpdateConsumersMessage() throws Exception {
+ FutureConsumerWithException[] notificationCalls = new FutureConsumerWithException[] {
+ writer -> ((ResultPartitionWriter) writer).finish(),
+ writer -> ((ResultPartitionWriter) writer).emitRecord(ByteBuffer.allocate(bufferSize), 0),
+ writer -> ((ResultPartitionWriter) writer).broadcastEvent(EndOfPartitionEvent.INSTANCE, false),
+ writer -> ((ResultPartitionWriter) writer).broadcastRecord(ByteBuffer.allocate(bufferSize))
+ };
+
+ for (FutureConsumerWithException notificationCall: notificationCalls) {
+ testSendScheduleOrUpdateConsumersMessage(notificationCall);
+ }
+ }
+
+ private void testSendScheduleOrUpdateConsumersMessage(
+ FutureConsumerWithException<ResultPartitionWriter, Exception> notificationCall) throws Exception {
JobID jobId = new JobID();
TaskActions taskActions = new NoOpTaskActions();
{
// Pipelined, send message => notify
- ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+ TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
ResultPartitionType.PIPELINED,
taskActions,
jobId,
notifier);
- consumableNotifyingPartitionWriter.addBufferConsumer(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE), 0);
- verify(notifier, times(1))
- .notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
+ notificationCall.accept(consumableNotifyingPartitionWriter);
+ notifier.check(jobId, consumableNotifyingPartitionWriter.getPartitionId(), taskActions, 1);
}
{
// Blocking, send message => don't notify
- ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+ TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
ResultPartitionWriter partition = createConsumableNotifyingResultPartitionWriter(
ResultPartitionType.BLOCKING,
taskActions,
jobId,
notifier);
- partition.addBufferConsumer(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE), 0);
- verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
+ notificationCall.accept(partition);
+ notifier.check(null, null, null, 0);
}
}
@@ -181,38 +193,32 @@ public class ResultPartitionTest {
}
/**
- * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished.
+ * Tests {@link ResultPartition#emitRecord} on a partition which has already finished.
*
* @param partitionType the result partition type to set up
*/
private void testAddOnFinishedPartition(final ResultPartitionType partitionType) throws Exception {
- BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
- ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
- JobID jobId = new JobID();
- TaskActions taskActions = new NoOpTaskActions();
- ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
- partitionType,
- taskActions,
- jobId,
- notifier);
+ TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
+ BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType);
+ ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
+ Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
+ new ResultPartitionWriter[]{bufferWritingResultPartition},
+ new NoOpTaskActions(),
+ new JobID(),
+ notifier)[0];
try {
- consumableNotifyingPartitionWriter.finish();
- reset(notifier);
- // partition.add() should fail
- consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
- Assert.fail("exception expected");
+ partitionWriter.finish();
+ notifier.reset();
+ // partitionWriter.emitRecord() should fail
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
} catch (IllegalStateException e) {
// expected => ignored
} finally {
- if (!bufferConsumer.isRecycled()) {
- bufferConsumer.close();
- Assert.fail("bufferConsumer not recycled");
- }
+ assertEquals(0, bufferWritingResultPartition.numBuffersOut.getCount());
+ assertEquals(0, bufferWritingResultPartition.numBytesOut.getCount());
+ assertEquals(0, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
// should not have notified either
- verify(notifier, never()).notifyPartitionConsumable(
- eq(jobId),
- eq(consumableNotifyingPartitionWriter.getPartitionId()),
- eq(taskActions));
+ notifier.check(null, null, null, 0);
}
}
@@ -227,35 +233,30 @@ public class ResultPartitionTest {
}
/**
- * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released.
+ * Tests {@link ResultPartition#emitRecord} on a partition which has already been released.
*
* @param partitionType the result partition type to set up
*/
- private void testAddOnReleasedPartition(final ResultPartitionType partitionType) throws Exception {
- BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
- ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
- JobID jobId = new JobID();
- TaskActions taskActions = new NoOpTaskActions();
- ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
- createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
- ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
+ private void testAddOnReleasedPartition(ResultPartitionType partitionType) throws Exception {
+ TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
+ BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType);
+ ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
- new ResultPartitionWriter[] {partition},
- taskActions,
- jobId,
+ new ResultPartitionWriter[]{bufferWritingResultPartition},
+ new NoOpTaskActions(),
+ new JobID(),
notifier)[0];
try {
- partition.release();
- // partition.add() silently drops the bufferConsumer but recycles it
- consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
- assertTrue(partition.isReleased());
+ partitionWriter.release(null);
+ // partitionWriter.emitRecord() should silently drop the given record
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
} finally {
- if (!bufferConsumer.isRecycled()) {
- bufferConsumer.close();
- Assert.fail("bufferConsumer not recycled");
- }
+ assertEquals(1, bufferWritingResultPartition.numBuffersOut.getCount());
+ assertEquals(bufferSize, bufferWritingResultPartition.numBytesOut.getCount());
+ // the buffer should be recycled for the result partition has already been released
+ assertEquals(0, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
// should not have notified either
- verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
+ notifier.check(null, null, null, 0);
}
}
@@ -289,32 +290,31 @@ public class ResultPartitionTest {
}
/**
- * Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition.
+ * Tests {@link ResultPartition#emitRecord} on a working partition.
*
* @param partitionType the result partition type to set up
*/
private void testAddOnPartition(final ResultPartitionType partitionType) throws Exception {
- ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+ TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
JobID jobId = new JobID();
TaskActions taskActions = new NoOpTaskActions();
- ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
- partitionType,
+ BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType);
+ ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
+ Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
+ new ResultPartitionWriter[]{bufferWritingResultPartition},
taskActions,
jobId,
- notifier);
- BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
+ notifier)[0];
try {
- // partition.add() adds the bufferConsumer without recycling it (if not spilling)
- consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
- assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled());
+ // partitionWriter.emitRecord() will allocate a new buffer and copies the record to it
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
} finally {
- if (!bufferConsumer.isRecycled()) {
- bufferConsumer.close();
- }
+ assertEquals(1, bufferWritingResultPartition.numBuffersOut.getCount());
+ assertEquals(bufferSize, bufferWritingResultPartition.numBytesOut.getCount());
+ assertEquals(1, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
// should have been notified for pipelined partitions
if (partitionType.isPipelined()) {
- verify(notifier, times(1))
- .notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
+ notifier.check(jobId, partitionWriter.getPartitionId(), taskActions, 1);
}
}
}
@@ -332,15 +332,14 @@ public class ResultPartitionTest {
private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception {
final int numAllBuffers = 10;
final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
- .setNumNetworkBuffers(numAllBuffers).build();
+ .setNumNetworkBuffers(numAllBuffers).setBufferSize(bufferSize).build();
final ResultPartition resultPartition = createPartition(network, resultPartitionType, 1);
try {
resultPartition.setup();
// take all buffers (more than the minimum required)
for (int i = 0; i < numAllBuffers; ++i) {
- BufferBuilder bufferBuilder = resultPartition.getBufferPool().requestBufferBuilderBlocking();
- resultPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+ resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
}
resultPartition.finish();
@@ -366,10 +365,12 @@ public class ResultPartitionTest {
* Tests {@link ResultPartition#getAvailableFuture()}.
*/
@Test
- public void testIsAvailableOrNot() throws IOException, InterruptedException {
+ public void testIsAvailableOrNot() throws IOException {
final int numAllBuffers = 10;
+ final int bufferSize = 1024;
final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
- .setNumNetworkBuffers(numAllBuffers).build();
+ .setNumNetworkBuffers(numAllBuffers)
+ .setBufferSize(bufferSize).build();
final ResultPartition resultPartition = createPartition(network, ResultPartitionType.PIPELINED, 1);
try {
@@ -379,8 +380,8 @@ public class ResultPartitionTest {
assertTrue(resultPartition.getAvailableFuture().isDone());
- resultPartition.getBufferBuilder(0);
- resultPartition.getBufferBuilder(0);
+ resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+ resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
assertFalse(resultPartition.getAvailableFuture().isDone());
} finally {
resultPartition.release();
@@ -434,17 +435,25 @@ public class ResultPartitionTest {
ResultPartitionType partitionType,
TaskActions taskActions,
JobID jobId,
- ResultPartitionConsumableNotifier notifier) {
- ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
- createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
+ ResultPartitionConsumableNotifier notifier) throws IOException {
+ ResultPartition partition = createResultPartition(partitionType);
return ConsumableNotifyingResultPartitionWriterDecorator.decorate(
Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
- new ResultPartitionWriter[] {partition},
+ new ResultPartitionWriter[]{partition},
taskActions,
jobId,
notifier)[0];
}
+ private BufferWritingResultPartition createResultPartition(ResultPartitionType partitionType) throws IOException {
+ NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
+ .setNumNetworkBuffers(10)
+ .setBufferSize(bufferSize).build();
+ ResultPartition resultPartition = createPartition(network, fileChannelManager, partitionType, 1);
+ resultPartition.setup();
+ return (BufferWritingResultPartition) resultPartition;
+ }
+
@Test
public void testInitializeEmptyState() throws Exception {
final int totalBuffers = 2;
@@ -558,6 +567,50 @@ public class ResultPartitionTest {
}
}
+ @Test
+ public void testIdleTime() throws IOException, InterruptedException {
+ // setup
+ int bufferSize = 1024;
+ NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize);
+ BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE);
+ BufferWritingResultPartition resultPartition = (BufferWritingResultPartition) new ResultPartitionBuilder()
+ .setBufferPoolFactory(p -> localPool)
+ .build();
+ resultPartition.setup();
+
+ resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+ ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+ Buffer buffer = readView.getNextBuffer().buffer();
+ assertNotNull(buffer);
+
+ // idle time is zero when there is buffer available.
+ assertEquals(0, resultPartition.getIdleTimeMsPerSecond().getCount());
+
+ CountDownLatch syncLock = new CountDownLatch(1);
+ final Thread requestThread = new Thread(() -> {
+ try {
+ // notify that the request thread start to run.
+ syncLock.countDown();
+ // wait for buffer.
+ resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+ } catch (Exception e) {
+ }
+ });
+ requestThread.start();
+
+ // wait until request thread start to run.
+ syncLock.await();
+
+ Thread.sleep(100);
+
+ //recycle the buffer
+ buffer.recycleBuffer();
+ requestThread.join();
+
+ Assert.assertThat(resultPartition.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L));
+ assertNotNull(readView.getNextBuffer().buffer());
+ }
+
/**
* The {@link ChannelStateReader} instance for restoring the specific number of states.
*/
@@ -631,4 +684,33 @@ public class ResultPartitionTest {
public void close() {
}
}
+
+ private static class TestResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+ private JobID jobID;
+ private ResultPartitionID partitionID;
+ private TaskActions taskActions;
+ private int numNotification;
+
+ @Override
+ public void notifyPartitionConsumable(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions) {
+ ++numNotification;
+ this.jobID = jobID;
+ this.partitionID = partitionID;
+ this.taskActions = taskActions;
+ }
+
+ private void check(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions, int numNotification) {
+ assertEquals(jobID, this.jobID);
+ assertEquals(partitionID, this.partitionID);
+ assertEquals(taskActions, this.taskActions);
+ assertEquals(numNotification, this.numNotification);
+ }
+
+ private void reset() {
+ jobID = null;
+ partitionID = null;
+ taskActions = null;
+ numNotification = 0;
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 4a5f445..a4985cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -19,10 +19,10 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -32,6 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Optional;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
@@ -49,7 +50,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
private MutableObjectIterator<T> inputIterator;
- private RecordSerializer<T> serializer;
+ private DataOutputSerializer serializer;
private final T reuse;
@@ -68,7 +69,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
inputIterator = iterator;
- serializer = new SpanningRecordSerializer<T>();
+ serializer = new DataOutputSerializer(128);
// The input iterator can produce an infinite stream. That's why we have to serialize each
// record on demand and cannot do it upfront.
@@ -79,10 +80,10 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
@Override
public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
if (hasData) {
- serializer.serializeRecord(reuse);
+ ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, reuse);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
- serializer.copyToBufferBuilder(bufferBuilder);
+ bufferBuilder.appendAndCommit(serializedRecord);
hasData = inputIterator.next(reuse) != null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 6e65ad74..90cbc6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -28,13 +28,12 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
@@ -60,7 +59,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -135,8 +133,8 @@ public class LocalInputChannelTest {
.setNumberOfSubpartitions(parallelism)
.setNumTargetKeyGroups(parallelism)
.setResultPartitionManager(partitionManager)
- .setBufferPoolFactory(p ->
- networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize))
+ .setBufferPoolFactory(p -> networkBuffers.createBufferPool(
+ producerBufferPoolSize, producerBufferPoolSize, null, parallelism, Integer.MAX_VALUE))
.build();
// Create a buffer pool for this partition
@@ -144,11 +142,11 @@ public class LocalInputChannelTest {
// Create the producer
partitionProducers[i] = new TestPartitionProducer(
- partition,
+ (BufferWritingResultPartition) partition,
false,
new TestPartitionProducerBufferSource(
parallelism,
- partition.getBufferPool(),
+ TestBufferFactory.BUFFER_SIZE,
numberOfBuffersPerChannel)
);
}
@@ -519,16 +517,16 @@ public class LocalInputChannelTest {
*/
private static class TestPartitionProducerBufferSource implements TestProducerSource {
- private final BufferProvider bufferProvider;
+ private final int bufferSize;
private final List<Byte> channelIndexes;
public TestPartitionProducerBufferSource(
int parallelism,
- BufferProvider bufferProvider,
+ int bufferSize,
int numberOfBuffersToProduce) {
- this.bufferProvider = bufferProvider;
+ this.bufferSize = bufferSize;
this.channelIndexes = Lists.newArrayListWithCapacity(
parallelism * numberOfBuffersToProduce);
@@ -544,14 +542,10 @@ public class LocalInputChannelTest {
}
@Override
- public BufferConsumerAndChannel getNextBufferConsumer() throws Exception {
+ public BufferAndChannel getNextBuffer() throws Exception {
if (channelIndexes.size() > 0) {
final int channelIndex = channelIndexes.remove(0);
- BufferBuilder bufferBuilder = bufferProvider.requestBufferBuilderBlocking();
- BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
- bufferBuilder.appendAndCommit(ByteBuffer.wrap(new byte[4]));
- bufferBuilder.finish();
- return new BufferConsumerAndChannel(bufferConsumer, channelIndex);
+ return new BufferAndChannel(new byte[bufferSize], channelIndex);
}
return null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 3787627..d16ee7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -42,6 +41,7 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
@@ -753,7 +753,7 @@ public class SingleInputGateTest extends InputGateTestBase {
public void testQueuedBuffers() throws Exception {
final NettyShuffleEnvironment network = createNettyShuffleEnvironment();
- final ResultPartition resultPartition = new ResultPartitionBuilder()
+ final BufferWritingResultPartition resultPartition = (BufferWritingResultPartition) new ResultPartitionBuilder()
.setResultPartitionManager(network.getResultPartitionManager())
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
.build();
@@ -784,7 +784,7 @@ public class SingleInputGateTest extends InputGateTestBase {
remoteInputChannel.onBuffer(createBuffer(1), 0, 0);
assertEquals(1, inputGate.getNumberOfQueuedBuffers());
- resultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1), 0);
+ resultPartition.emitRecord(ByteBuffer.allocate(1), 0);
assertEquals(2, inputGate.getNumberOfQueuedBuffers());
} finally {
resultPartition.release();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
index 2dfb4c9..9d94402 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
@@ -18,9 +18,10 @@
package org.apache.flink.runtime.io.network.util;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferConsumerAndChannel;
+import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
+import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferAndChannel;
+import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -38,7 +39,7 @@ public class TestPartitionProducer implements Callable<Boolean> {
public static final int MAX_SLEEP_TIME_MS = 20;
/** The partition to add data to. */
- private final ResultPartition partition;
+ private final BufferWritingResultPartition partition;
/**
* Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
@@ -53,7 +54,7 @@ public class TestPartitionProducer implements Callable<Boolean> {
private final Random random;
public TestPartitionProducer(
- ResultPartition partition,
+ BufferWritingResultPartition partition,
boolean isSlowProducer,
TestProducerSource source) {
@@ -69,10 +70,11 @@ public class TestPartitionProducer implements Callable<Boolean> {
boolean success = false;
try {
- BufferConsumerAndChannel consumerAndChannel;
+ BufferAndChannel bufferAndChannel;
- while ((consumerAndChannel = source.getNextBufferConsumer()) != null) {
- partition.addBufferConsumer(consumerAndChannel.getBufferConsumer(), consumerAndChannel.getTargetChannel());
+ while ((bufferAndChannel = source.getNextBuffer()) != null) {
+ ByteBuffer record = ByteBuffer.wrap(bufferAndChannel.getBuffer());
+ partition.emitRecord(record, bufferAndChannel.getTargetChannel());
// Check for interrupted flag after adding data to prevent resource leaks
if (Thread.interrupted()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
index f5d97f5..99006e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
@@ -29,19 +29,19 @@ public interface TestProducerSource {
*
* <p> The channel index specifies the subpartition add the data to.
*/
- BufferConsumerAndChannel getNextBufferConsumer() throws Exception;
+ BufferAndChannel getNextBuffer() throws Exception;
- class BufferConsumerAndChannel {
- private final BufferConsumer bufferConsumer;
+ class BufferAndChannel {
+ private final byte[] buffer;
private final int targetChannel;
- public BufferConsumerAndChannel(BufferConsumer bufferConsumer, int targetChannel) {
- this.bufferConsumer = checkNotNull(bufferConsumer);
+ public BufferAndChannel(byte[] buffer, int targetChannel) {
+ this.buffer = checkNotNull(buffer);
this.targetChannel = targetChannel;
}
- public BufferConsumer getBufferConsumer() {
- return bufferConsumer;
+ public byte[] getBuffer() {
+ return buffer;
}
public int getTargetChannel() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
index 60eee34..a14cdba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
@@ -18,8 +18,12 @@
package org.apache.flink.runtime.io.network.util;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
-import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferConsumerAndChannel;
+import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferAndChannel;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -69,10 +73,11 @@ public class TestSubpartitionProducer implements Callable<Boolean> {
boolean success = false;
try {
- BufferConsumerAndChannel consumerAndChannel;
+ BufferAndChannel bufferAndChannel;
- while ((consumerAndChannel = source.getNextBufferConsumer()) != null) {
- subpartition.add(consumerAndChannel.getBufferConsumer());
+ while ((bufferAndChannel = source.getNextBuffer()) != null) {
+ MemorySegment segment = MemorySegmentFactory.wrap(bufferAndChannel.getBuffer());
+ subpartition.add(new BufferConsumer(segment, MemorySegment::free, Buffer.DataType.DATA_BUFFER));
// Check for interrupted flag after adding data to prevent resource leaks
if (Thread.interrupted()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 2865204..a28a0b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -202,7 +202,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
public void addOutput(final List<Record> outputList) {
try {
- outputs.add(new RecordCollectingResultPartitionWriter(outputList, new TestPooledBufferProvider(Integer.MAX_VALUE)));
+ outputs.add(new RecordCollectingResultPartitionWriter(outputList));
}
catch (Throwable t) {
t.printStackTrace();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 082ddc5..a297460 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,11 +21,11 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.B
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -81,7 +82,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
- final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
+ final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<>(new StreamElementSerializer<T>(serializer));
@@ -107,10 +108,10 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
- recordSerializer.serializeRecord(delegate);
+ ByteBuffer serializedRecord = RecordWriter.serializeRecord(dataOutputSerializer, delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
- recordSerializer.copyToBufferBuilder(bufferBuilder);
+ bufferBuilder.appendAndCommit(serializedRecord);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 4c68ec3..48f833d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
@@ -27,9 +28,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -53,6 +53,7 @@ import org.junit.After;
import org.junit.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -218,14 +219,15 @@ public class StreamTaskNetworkInputTest {
}
private void serializeRecord(long value, BufferBuilder bufferBuilder) throws IOException {
- RecordSerializer<SerializationDelegate<StreamElement>> serializer = new SpanningRecordSerializer<>();
+ DataOutputSerializer serializer = new DataOutputSerializer(128);
SerializationDelegate<StreamElement> serializationDelegate =
new SerializationDelegate<>(
new StreamElementSerializer<>(LongSerializer.INSTANCE));
serializationDelegate.setInstance(new StreamRecord<>(value));
- serializer.serializeRecord(serializationDelegate);
+ ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, serializationDelegate);
+ bufferBuilder.appendAndCommit(serializedRecord);
- assertFalse(serializer.copyToBufferBuilder(bufferBuilder).isFullBuffer());
+ assertFalse(bufferBuilder.isFull());
}
private static void assertHasNextElement(StreamTaskNetworkInput input, DataOutput output) throws Exception {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index bf2b79c..447b9a0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -204,7 +203,6 @@ public class StreamMockEnvironment implements Environment {
try {
outputs.add(new RecordOrEventCollectingResultPartitionWriter<T>(
outputList,
- new TestPooledBufferProvider(Integer.MAX_VALUE),
serializer));
}
catch (Throwable t) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 00ef9a1..87142d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -252,11 +251,10 @@ public class SubtaskCheckpointCoordinatorTest {
.setEnvironment(mockEnvironment)
.build();
- TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1, 4096);
ArrayList<Object> recordOrEvents = new ArrayList<>();
StreamElementSerializer<String> stringStreamElementSerializer = new StreamElementSerializer<>(StringSerializer.INSTANCE);
ResultPartitionWriter resultPartitionWriter = new RecordOrEventCollectingResultPartitionWriter<>(
- recordOrEvents, bufferProvider, stringStreamElementSerializer);
+ recordOrEvents, stringStreamElementSerializer);
mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter));
OneInputStreamTask<String, String> task = testHarness.getTask();