You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:42:40 UTC
[6/6] flink git commit: [FLINK-5169] [network] Make consumption of
InputChannels fair
[FLINK-5169] [network] Make consumption of InputChannels fair
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f728129b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f728129b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f728129b
Branch: refs/heads/master
Commit: f728129bdb8c3176fba03c3e74c65ed254146061
Parents: dbe7073
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 28 09:59:29 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 21:42:49 2016 +0100
----------------------------------------------------------------------
.../io/network/api/reader/BufferReader.java | 50 ---
.../io/network/netty/PartitionRequestQueue.java | 255 +++++--------
.../netty/PartitionRequestServerHandler.java | 40 +-
.../netty/SequenceNumberingViewReader.java | 130 +++++++
.../partition/BufferAvailabilityListener.java | 33 ++
.../partition/PipelinedSubpartition.java | 148 ++++----
.../partition/PipelinedSubpartitionView.java | 18 +-
.../io/network/partition/ResultPartition.java | 9 +-
.../partition/ResultPartitionManager.java | 5 +-
.../partition/ResultPartitionProvider.java | 3 +-
.../network/partition/ResultSubpartition.java | 6 +-
.../partition/ResultSubpartitionView.java | 9 +-
.../partition/SpillableSubpartition.java | 179 ++++-----
.../partition/SpillableSubpartitionView.java | 210 ++++++-----
.../partition/SpilledSubpartitionView.java | 223 +++++++++++
.../SpilledSubpartitionViewAsyncIO.java | 377 -------------------
.../SpilledSubpartitionViewSyncIO.java | 196 ----------
.../partition/consumer/BufferOrEvent.java | 25 +-
.../partition/consumer/InputChannel.java | 43 ++-
.../network/partition/consumer/InputGate.java | 3 +-
.../partition/consumer/InputGateListener.java | 35 ++
.../partition/consumer/LocalInputChannel.java | 212 +++++------
.../partition/consumer/RemoteInputChannel.java | 95 +++--
.../partition/consumer/SingleInputGate.java | 144 ++++---
.../partition/consumer/UnionInputGate.java | 98 ++---
.../partition/consumer/UnknownInputChannel.java | 11 +-
.../apache/flink/runtime/taskmanager/Task.java | 1 -
27 files changed, 1175 insertions(+), 1383 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
deleted file mode 100644
index ca59609..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-import java.io.IOException;
-
-/**
- * A buffer-oriented reader.
- */
-public final class BufferReader extends AbstractReader {
-
- public BufferReader(InputGate gate) {
- super(gate);
- }
-
- public Buffer getNextBuffer() throws IOException, InterruptedException {
- while (true) {
- final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
-
- if (bufferOrEvent.isBuffer()) {
- return bufferOrEvent.getBuffer();
- }
- else {
- if (handleEvent(bufferOrEvent.getEvent())) {
- return null;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 094c9c7..dc80675 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -27,10 +27,10 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
-import org.apache.flink.runtime.util.event.NotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,11 +39,10 @@ import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Set;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
/**
- * A queue of partition queues, which listens for channel writability changed
+ * A nonEmptyReader of partition queues, which listens for channel writability changed
* events before writing and flushing {@link Buffer} instances.
*/
class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
@@ -52,12 +51,10 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
- private final Queue<SequenceNumberingSubpartitionView> queue = new ArrayDeque<SequenceNumberingSubpartitionView>();
+ private final Queue<SequenceNumberingViewReader> nonEmptyReader = new ArrayDeque<>();
private final Set<InputChannelID> released = Sets.newHashSet();
- private SequenceNumberingSubpartitionView currentPartitionQueue;
-
private boolean fatalError;
private ChannelHandlerContext ctx;
@@ -71,8 +68,22 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
super.channelRegistered(ctx);
}
- public void enqueue(ResultSubpartitionView partitionQueue, InputChannelID receiverId) throws Exception {
- ctx.pipeline().fireUserEventTriggered(new SequenceNumberingSubpartitionView(partitionQueue, receiverId));
+ void notifyReaderNonEmpty(final SequenceNumberingViewReader reader) {
+ // The notification might come from the same thread. For the initial writes this
+ // might happen before the reader has set its reference to the view, because
+ // creating the queue and the initial notification happen in the same method call.
+ // This can be resolved by separating the creation of the view and allowing
+ // notifications.
+
+ // TODO This could potentially have a bad performance impact as in the
+ // worst case (network consumes faster than the producer) each buffer
+ // will trigger a separate event loop task being scheduled.
+ ctx.executor().execute(new Runnable() {
+ @Override
+ public void run() {
+ ctx.pipeline().fireUserEventTriggered(reader);
+ }
+ });
}
public void cancel(InputChannelID receiverId) {
@@ -87,45 +98,37 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg.getClass() == SequenceNumberingSubpartitionView.class) {
- boolean triggerWrite = queue.isEmpty();
-
- queue.add((SequenceNumberingSubpartitionView) msg);
-
+ // The user event triggered event loop callback is used for thread-safe
+ // hand over of reader queues and cancelled producers.
+
+ if (msg.getClass() == SequenceNumberingViewReader.class) {
+ // Queue a non-empty reader for consumption. If the queue
+ // is empty, we try trigger the actual write. Otherwise this
+ // will be handled by the writeAndFlushIfPossible calls.
+ boolean triggerWrite = nonEmptyReader.isEmpty();
+ nonEmptyReader.add((SequenceNumberingViewReader) msg);
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
- }
- else if (msg.getClass() == InputChannelID.class) {
+ } else if (msg.getClass() == InputChannelID.class) {
+ // Release partition view that get a cancel request.
InputChannelID toCancel = (InputChannelID) msg;
-
if (released.contains(toCancel)) {
return;
}
// Cancel the request for the input channel
- if (currentPartitionQueue != null && currentPartitionQueue.getReceiverId().equals(toCancel)) {
- currentPartitionQueue.releaseAllResources();
- markAsReleased(currentPartitionQueue.receiverId);
- currentPartitionQueue = null;
- }
- else {
- int size = queue.size();
-
- for (int i = 0; i < size; i++) {
- SequenceNumberingSubpartitionView curr = queue.poll();
-
- if (curr.getReceiverId().equals(toCancel)) {
- curr.releaseAllResources();
- markAsReleased(curr.receiverId);
- }
- else {
- queue.add(curr);
- }
+ int size = nonEmptyReader.size();
+ for (int i = 0; i < size; i++) {
+ SequenceNumberingViewReader reader = nonEmptyReader.poll();
+ if (reader.getReceiverId().equals(toCancel)) {
+ reader.releaseAllResources();
+ markAsReleased(reader.getReceiverId());
+ } else {
+ nonEmptyReader.add(reader);
}
}
- }
- else {
+ } else {
ctx.fireUserEventTriggered(msg);
}
}
@@ -140,64 +143,84 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
return;
}
- Buffer buffer = null;
+ // The logic here is very similar to the combined input gate and local
+ // input channel logic. You can think of this class acting as the input
+ // gate and the consumed views as the local input channels.
+ BufferAndAvailability next = null;
try {
if (channel.isWritable()) {
while (true) {
- if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
+ SequenceNumberingViewReader reader = nonEmptyReader.poll();
+
+ // No queue with available data. We allow this here, because
+ // of the write callbacks that are executed after each write.
+ if (reader == null) {
return;
}
- buffer = currentPartitionQueue.getNextBuffer();
+ next = reader.getNextBuffer();
- if (buffer == null) {
- if (currentPartitionQueue.registerListener(null)) {
- currentPartitionQueue = null;
- }
- else if (currentPartitionQueue.isReleased()) {
- markAsReleased(currentPartitionQueue.getReceiverId());
-
- Throwable cause = currentPartitionQueue.getFailureCause();
+ if (next == null) {
+ if (reader.isReleased()) {
+ markAsReleased(reader.getReceiverId());
+ Throwable cause = reader.getFailureCause();
if (cause != null) {
- ctx.writeAndFlush(new NettyMessage.ErrorResponse(
- new ProducerFailedException(cause),
- currentPartitionQueue.receiverId));
- }
+ ErrorResponse msg = new ErrorResponse(
+ new ProducerFailedException(cause),
+ reader.getReceiverId());
- currentPartitionQueue = null;
+ ctx.writeAndFlush(msg);
+ }
+ } else {
+ IllegalStateException err = new IllegalStateException(
+ "Bug in Netty consumer logic: reader queue got notified by partition " +
+ "about available data, but none was available.");
+ handleException(ctx.channel(), err);
+ return;
+ }
+ } else {
+ // this channel was now removed from the non-empty reader queue
+ // we re-add it in case it has more data, because in that case no
+ // "non-empty" notification will come for that reader from the queue.
+ if (next.moreAvailable()) {
+ nonEmptyReader.add(reader);
}
- }
- else {
- BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());
- if (!buffer.isBuffer() &&
- EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
+ BufferResponse msg = new BufferResponse(
+ next.buffer(),
+ reader.getSequenceNumber(),
+ reader.getReceiverId());
- currentPartitionQueue.notifySubpartitionConsumed();
- currentPartitionQueue.releaseAllResources();
- markAsReleased(currentPartitionQueue.getReceiverId());
+ if (isEndOfPartitionEvent(next.buffer())) {
+ reader.notifySubpartitionConsumed();
+ reader.releaseAllResources();
- currentPartitionQueue = null;
+ markAsReleased(reader.getReceiverId());
}
- channel.writeAndFlush(resp).addListener(writeListener);
+ // Write and flush and wait until this is done before
+ // trying to continue with the next buffer.
+ channel.writeAndFlush(msg).addListener(writeListener);
return;
}
}
}
- }
- catch (Throwable t) {
- if (buffer != null) {
- buffer.recycle();
+ } catch (Throwable t) {
+ if (next != null) {
+ next.buffer().recycle();
}
throw new IOException(t.getMessage(), t);
}
}
+ private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException {
+ return !buffer.isBuffer() && EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class;
+ }
+
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
releaseAllResources();
@@ -215,22 +238,15 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
releaseAllResources();
if (channel.isActive()) {
- channel.writeAndFlush(new NettyMessage.ErrorResponse(cause)).addListener(ChannelFutureListener.CLOSE);
+ channel.writeAndFlush(new ErrorResponse(cause)).addListener(ChannelFutureListener.CLOSE);
}
}
private void releaseAllResources() throws IOException {
- if (currentPartitionQueue != null) {
- currentPartitionQueue.releaseAllResources();
- markAsReleased(currentPartitionQueue.getReceiverId());
-
- currentPartitionQueue = null;
- }
-
- while ((currentPartitionQueue = queue.poll()) != null) {
- currentPartitionQueue.releaseAllResources();
-
- markAsReleased(currentPartitionQueue.getReceiverId());
+ SequenceNumberingViewReader reader;
+ while ((reader = nonEmptyReader.poll()) != null) {
+ reader.releaseAllResources();
+ markAsReleased(reader.getReceiverId());
}
}
@@ -241,7 +257,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
released.add(receiverId);
}
- // This listener is called after an element of the current queue has been
+ // This listener is called after an element of the current nonEmptyReader has been
// flushed. If successful, the listener triggers further processing of the
// queues.
private class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener {
@@ -251,87 +267,14 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
try {
if (future.isSuccess()) {
writeAndFlushNextMessageIfPossible(future.channel());
- }
- else if (future.cause() != null) {
+ } else if (future.cause() != null) {
handleException(future.channel(), future.cause());
- }
- else {
+ } else {
handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
}
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
handleException(future.channel(), t);
}
}
}
-
- /**
- * Simple wrapper for the partition queue iterator, which increments a
- * sequence number for each returned buffer and remembers the receiver ID.
- */
- private class SequenceNumberingSubpartitionView implements ResultSubpartitionView, NotificationListener {
-
- private final ResultSubpartitionView queueIterator;
-
- private final InputChannelID receiverId;
-
- private int sequenceNumber = -1;
-
- private SequenceNumberingSubpartitionView(ResultSubpartitionView queueIterator, InputChannelID receiverId) {
- this.queueIterator = checkNotNull(queueIterator);
- this.receiverId = checkNotNull(receiverId);
- }
-
- private InputChannelID getReceiverId() {
- return receiverId;
- }
-
- private int getSequenceNumber() {
- return sequenceNumber;
- }
-
- @Override
- public Buffer getNextBuffer() throws IOException, InterruptedException {
- Buffer buffer = queueIterator.getNextBuffer();
-
- if (buffer != null) {
- sequenceNumber++;
- }
-
- return buffer;
- }
-
- @Override
- public void notifySubpartitionConsumed() throws IOException {
- queueIterator.notifySubpartitionConsumed();
- }
-
- @Override
- public boolean isReleased() {
- return queueIterator.isReleased();
- }
-
- @Override
- public Throwable getFailureCause() {
- return queueIterator.getFailureCause();
- }
-
- @Override
- public boolean registerListener(NotificationListener ignored) throws IOException {
- return queueIterator.registerListener(this);
- }
-
- @Override
- public void releaseAllResources() throws IOException {
- queueIterator.releaseAllResources();
- }
-
- /**
- * Enqueue this iterator again after a notification.
- */
- @Override
- public void onNotification() {
- ctx.pipeline().fireUserEventTriggered(this);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index e278d07..12b52ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionReq
import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,10 +52,10 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
private BufferPool bufferPool;
PartitionRequestServerHandler(
- ResultPartitionProvider partitionProvider,
- TaskEventDispatcher taskEventDispatcher,
- PartitionRequestQueue outboundQueue,
- NetworkBufferPool networkBufferPool) {
+ ResultPartitionProvider partitionProvider,
+ TaskEventDispatcher taskEventDispatcher,
+ PartitionRequestQueue outboundQueue,
+ NetworkBufferPool networkBufferPool) {
this.partitionProvider = partitionProvider;
this.taskEventDispatcher = taskEventDispatcher;
@@ -94,15 +93,16 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);
try {
- ResultSubpartitionView subpartition =
- partitionProvider.createSubpartitionView(
- request.partitionId,
- request.queueIndex,
- bufferPool);
-
- outboundQueue.enqueue(subpartition, request.receiverId);
- }
- catch (PartitionNotFoundException notFound) {
+ SequenceNumberingViewReader reader = new SequenceNumberingViewReader(
+ request.receiverId,
+ outboundQueue);
+
+ reader.requestSubpartitionView(
+ partitionProvider,
+ request.partitionId,
+ request.queueIndex,
+ bufferPool);
+ } catch (PartitionNotFoundException notFound) {
respondWithError(ctx, notFound, request.receiverId);
}
}
@@ -115,20 +115,16 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
if (!taskEventDispatcher.publish(request.partitionId, request.event)) {
respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId);
}
- }
- else if (msgClazz == CancelPartitionRequest.class) {
+ } else if (msgClazz == CancelPartitionRequest.class) {
CancelPartitionRequest request = (CancelPartitionRequest) msg;
outboundQueue.cancel(request.receiverId);
- }
- else if (msgClazz == CloseRequest.class) {
+ } else if (msgClazz == CloseRequest.class) {
outboundQueue.close();
- }
- else {
+ } else {
LOG.warn("Received unexpected client request: {}", msg);
}
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
respondWithError(ctx, t);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
new file mode 100644
index 0000000..ef611eb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Simple wrapper for the partition readerQueue iterator, which increments a
+ * sequence number for each returned buffer and remembers the receiver ID.
+ *
+ * <p>It also keeps track of available buffers and notifies the outbound
+ * handler about non-emptiness, similar to the {@link LocalInputChannel}.
+ */
+class SequenceNumberingViewReader implements BufferAvailabilityListener {
+
+ private final Object requestLock = new Object();
+
+ private final InputChannelID receiverId;
+
+ private final AtomicLong numBuffersAvailable = new AtomicLong();
+
+ private final PartitionRequestQueue requestQueue;
+
+ private volatile ResultSubpartitionView subpartitionView;
+
+ private int sequenceNumber = -1;
+
+ SequenceNumberingViewReader(InputChannelID receiverId, PartitionRequestQueue requestQueue) {
+ this.receiverId = receiverId;
+ this.requestQueue = requestQueue;
+ }
+
+ void requestSubpartitionView(
+ ResultPartitionProvider partitionProvider,
+ ResultPartitionID resultPartitionId,
+ int subPartitionIndex,
+ BufferProvider bufferProvider) throws IOException {
+
+ synchronized (requestLock) {
+ if (subpartitionView == null) {
+ // This this call can trigger a notification we have to
+ // schedule a separate task at the event loop that will
+ // start consuming this. Otherwise the reference to the
+ // view cannot be available in getNextBuffer().
+ this.subpartitionView = partitionProvider.createSubpartitionView(
+ resultPartitionId,
+ subPartitionIndex,
+ bufferProvider,
+ this);
+ } else {
+ throw new IllegalStateException("Subpartition already requested");
+ }
+ }
+ }
+
+ InputChannelID getReceiverId() {
+ return receiverId;
+ }
+
+ int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
+ Buffer next = subpartitionView.getNextBuffer();
+ if (next != null) {
+ long remaining = numBuffersAvailable.decrementAndGet();
+ sequenceNumber++;
+
+ if (remaining >= 0) {
+ return new BufferAndAvailability(next, remaining > 0);
+ } else {
+ throw new IllegalStateException("no buffer available");
+ }
+ } else {
+ return null;
+ }
+ }
+
+ public void notifySubpartitionConsumed() throws IOException {
+ subpartitionView.notifySubpartitionConsumed();
+ }
+
+ public boolean isReleased() {
+ return subpartitionView.isReleased();
+ }
+
+ public Throwable getFailureCause() {
+ return subpartitionView.getFailureCause();
+ }
+
+ public void releaseAllResources() throws IOException {
+ subpartitionView.releaseAllResources();
+ }
+
+ @Override
+ public void notifyBuffersAvailable(long numBuffers) {
+ // if this request made the channel non-empty, notify the input gate
+ if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) {
+ requestQueue.notifyReaderNonEmpty(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
new file mode 100644
index 0000000..114ef7c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+/**
+ * Listener interface implemented by consumers of {@link ResultSubpartitionView}
+ * that want to be notified of availability of further buffers.
+ */
+public interface BufferAvailabilityListener {
+
+ /**
+ * Called whenever a new number of buffers becomes available.
+ *
+ * @param numBuffers The number of buffers that became available.
+ */
+ void notifyBuffersAvailable(long numBuffers);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 3981a26..e9400f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.NotificationListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.ArrayDeque;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* A pipelined in-memory only subpartition, which can be consumed once.
@@ -38,51 +39,47 @@ class PipelinedSubpartition extends ResultSubpartition {
private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class);
+ // ------------------------------------------------------------------------
+
+ /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
+ private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
+
+ /** The read view to consume this subpartition. */
+ private PipelinedSubpartitionView readView;
+
/** Flag indicating whether the subpartition has been finished. */
private boolean isFinished;
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
- /**
- * A data availability listener. Registered, when the consuming task is faster than the
- * producing task.
- */
- private NotificationListener registeredListener;
-
- /** The read view to consume this subpartition. */
- private PipelinedSubpartitionView readView;
-
- /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
- final ArrayDeque<Buffer> buffers = new ArrayDeque<Buffer>();
+ // ------------------------------------------------------------------------
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
@Override
- public boolean add(Buffer buffer) {
+ public boolean add(Buffer buffer) throws IOException {
checkNotNull(buffer);
- final NotificationListener listener;
+ // view reference accessible outside the lock, but assigned inside the locked scope
+ final PipelinedSubpartitionView reader;
synchronized (buffers) {
- if (isReleased || isFinished) {
+ if (isFinished || isReleased) {
return false;
}
// Add the buffer and update the stats
buffers.add(buffer);
+ reader = readView;
updateStatistics(buffer);
-
- // Get the listener...
- listener = registeredListener;
- registeredListener = null;
}
// Notify the listener outside of the synchronized block
- if (listener != null) {
- listener.onNotification();
+ if (reader != null) {
+ reader.notifyBuffersAvailable(1);
}
return true;
@@ -90,36 +87,34 @@ class PipelinedSubpartition extends ResultSubpartition {
@Override
public void finish() throws IOException {
- final NotificationListener listener;
+ final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+ // view reference accessible outside the lock, but assigned inside the locked scope
+ final PipelinedSubpartitionView reader;
synchronized (buffers) {
- if (isReleased || isFinished) {
+ if (isFinished || isReleased) {
return;
}
- final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-
buffers.add(buffer);
+ reader = readView;
updateStatistics(buffer);
isFinished = true;
-
- LOG.debug("Finished {}.", this);
-
- // Get the listener...
- listener = registeredListener;
- registeredListener = null;
}
+ LOG.debug("Finished {}.", this);
+
// Notify the listener outside of the synchronized block
- if (listener != null) {
- listener.onNotification();
+ if (reader != null) {
+ reader.notifyBuffersAvailable(1);
}
}
@Override
public void release() {
- final NotificationListener listener;
+ // view reference accessible outside the lock, but assigned inside the locked scope
final PipelinedSubpartitionView view;
synchronized (buffers) {
@@ -130,40 +125,35 @@ class PipelinedSubpartition extends ResultSubpartition {
// Release all available buffers
Buffer buffer;
while ((buffer = buffers.poll()) != null) {
- if (!buffer.isRecycled()) {
- buffer.recycle();
- }
+ buffer.recycle();
}
// Get the view...
view = readView;
readView = null;
- // Get the listener...
- listener = registeredListener;
- registeredListener = null;
-
// Make sure that no further buffers are added to the subpartition
isReleased = true;
-
- LOG.debug("Released {}.", this);
}
+ LOG.debug("Released {}.", this);
+
// Release all resources of the view
if (view != null) {
view.releaseAllResources();
}
+ }
- // Notify the listener outside of the synchronized block
- if (listener != null) {
- listener.onNotification();
+ Buffer pollBuffer() {
+ synchronized (buffers) {
+ return buffers.pollFirst();
}
}
@Override
public int releaseMemory() {
- // The pipelined subpartition does not react to memory release requests. The buffers will be
- // recycled by the consuming task.
+ // The pipelined subpartition does not react to memory release requests.
+ // The buffers will be recycled by the consuming task.
return 0;
}
@@ -173,53 +163,43 @@ class PipelinedSubpartition extends ResultSubpartition {
}
@Override
- public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider) {
- synchronized (buffers) {
- if (readView != null) {
- throw new IllegalStateException("Subpartition " + index + " of "
- + parent.getPartitionId() + " is being or already has been " +
- "consumed, but pipelined subpartitions can only be consumed once.");
- }
+ public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+ final int queueSize;
- readView = new PipelinedSubpartitionView(this);
+ synchronized (buffers) {
+ checkState(!isReleased);
+ checkState(readView == null,
+ "Subpartition %s of is being (or already has been) consumed, " +
+ "but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());
- LOG.debug("Created read view for subpartition {} of partition {}.", index, parent.getPartitionId());
+ LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId());
- return readView;
+ queueSize = buffers.size();
+ readView = new PipelinedSubpartitionView(this, availabilityListener);
}
+
+ readView.notifyBuffersAvailable(queueSize);
+
+ return readView;
}
@Override
public String toString() {
- synchronized (buffers) {
- return String.format("PipelinedSubpartition [number of buffers: %d (%d bytes), " +
- "finished? %s, read view? %s]",
- getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null);
- }
- }
+ final long numBuffers;
+ final long numBytes;
+ final boolean finished;
+ final boolean hasReadView;
- /**
- * Registers a listener with this subpartition and returns whether the registration was
- * successful.
- *
- * <p> A registered listener is notified when the state of the subpartition changes. After a
- * notification, the listener is unregistered. Only a single listener is allowed to be
- * registered.
- */
- boolean registerListener(NotificationListener listener) {
synchronized (buffers) {
- if (!buffers.isEmpty() || isReleased) {
- return false;
- }
-
- if (registeredListener == null) {
- registeredListener = listener;
-
- return true;
- }
-
- throw new IllegalStateException("Already registered listener.");
+ numBuffers = getTotalNumberOfBuffers();
+ numBytes = getTotalNumberOfBytes();
+ finished = isFinished;
+ hasReadView = readView != null;
}
+
+ return String.format(
+ "PipelinedSubpartition [number of buffers: %d (%d bytes), finished? %s, read view? %s]",
+ numBuffers, numBytes, finished, hasReadView);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index f8d81a4..52c78ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.event.NotificationListener;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -33,23 +33,25 @@ class PipelinedSubpartitionView implements ResultSubpartitionView {
/** The subpartition this view belongs to. */
private final PipelinedSubpartition parent;
+ private final BufferAvailabilityListener availabilityListener;
+
/** Flag indicating whether this view has been released. */
- private AtomicBoolean isReleased = new AtomicBoolean();
+ private final AtomicBoolean isReleased;
- PipelinedSubpartitionView(PipelinedSubpartition parent) {
+ PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) {
this.parent = checkNotNull(parent);
+ this.availabilityListener = checkNotNull(listener);
+ this.isReleased = new AtomicBoolean();
}
@Override
public Buffer getNextBuffer() {
- synchronized (parent.buffers) {
- return parent.buffers.poll();
- }
+ return parent.pollBuffer();
}
@Override
- public boolean registerListener(NotificationListener listener) {
- return !isReleased.get() && parent.registerListener(listener);
+ public void notifyBuffersAvailable(long numBuffers) throws IOException {
+ availabilityListener.notifyBuffersAvailable(numBuffers);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 834318c..474c25c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -135,7 +134,6 @@ public class ResultPartition implements BufferPoolOwner {
ResultPartitionManager partitionManager,
ResultPartitionConsumableNotifier partitionConsumableNotifier,
IOManager ioManager,
- IOMode defaultIoMode,
boolean sendScheduleOrUpdateConsumersMessage) {
this.owningTaskName = checkNotNull(owningTaskName);
@@ -152,8 +150,7 @@ public class ResultPartition implements BufferPoolOwner {
switch (partitionType) {
case BLOCKING:
for (int i = 0; i < subpartitions.length; i++) {
- subpartitions[i] = new SpillableSubpartition(
- i, this, ioManager, defaultIoMode);
+ subpartitions[i] = new SpillableSubpartition(i, this, ioManager);
}
break;
@@ -340,7 +337,7 @@ public class ResultPartition implements BufferPoolOwner {
/**
* Returns the requested subpartition.
*/
- public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider) throws IOException {
+ public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
int refCnt = pendingReferences.get();
checkState(refCnt != -1, "Partition released.");
@@ -348,7 +345,7 @@ public class ResultPartition implements BufferPoolOwner {
checkElementIndex(index, subpartitions.length, "Subpartition not found.");
- ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider);
+ ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider, availabilityListener);
LOG.debug("Created {}", readView);
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 9da3e14..8ad3e34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -66,7 +66,8 @@ public class ResultPartitionManager implements ResultPartitionProvider {
public ResultSubpartitionView createSubpartitionView(
ResultPartitionID partitionId,
int subpartitionIndex,
- BufferProvider bufferProvider) throws IOException {
+ BufferProvider bufferProvider,
+ BufferAvailabilityListener availabilityListener) throws IOException {
synchronized (registeredPartitions) {
final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(),
@@ -78,7 +79,7 @@ public class ResultPartitionManager implements ResultPartitionProvider {
LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition);
- return partition.createSubpartitionView(subpartitionIndex, bufferProvider);
+ return partition.createSubpartitionView(subpartitionIndex, bufferProvider, availabilityListener);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index 23dd1d3..3fbfd49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -30,6 +30,7 @@ public interface ResultPartitionProvider {
ResultSubpartitionView createSubpartitionView(
ResultPartitionID partitionId,
int index,
- BufferProvider bufferProvider) throws IOException;
+ BufferProvider bufferProvider,
+ BufferAvailabilityListener availabilityListener) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 31c8f73..dd0e152 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -37,7 +37,7 @@ public abstract class ResultSubpartition {
// - Statistics ----------------------------------------------------------
/** The total number of buffers (both data and event buffers) */
- private int totalNumberOfBuffers;
+ private long totalNumberOfBuffers;
/** The total number of bytes (both data and event buffers) */
private long totalNumberOfBytes;
@@ -52,7 +52,7 @@ public abstract class ResultSubpartition {
totalNumberOfBytes += buffer.getSize();
}
- protected int getTotalNumberOfBuffers() {
+ protected long getTotalNumberOfBuffers() {
return totalNumberOfBuffers;
}
@@ -77,7 +77,7 @@ public abstract class ResultSubpartition {
abstract public void release() throws IOException;
- abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException;
+ abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException;
abstract int releaseMemory() throws IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index cfc5455..98be90f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.event.NotificationListener;
import java.io.IOException;
@@ -41,13 +40,7 @@ public interface ResultSubpartitionView {
*/
Buffer getNextBuffer() throws IOException, InterruptedException;
- /**
- * Subscribes to data availability notifications.
- * <p>
- * Returns whether the subscription was successful. A subscription fails,
- * if there is data available.
- */
- boolean registerListener(NotificationListener listener) throws IOException;
+ void notifyBuffersAvailable(long buffers) throws IOException;
void releaseAllResources() throws IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 3f19559..439e08d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -18,42 +18,54 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.ArrayDeque;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A blocking in-memory subpartition, which is able to spill to disk.
+ * A spillable sub partition starts out in-memory and spills to disk if asked
+ * to do so.
*
- * <p> Buffers are kept in-memory as long as possible. If not possible anymore, all buffers are
- * spilled to disk.
+ * <p>Buffers for the partition come from a {@link BufferPool}. The buffer pool
+ * is also responsible to trigger the release of the buffers if it needs them
+ * back. At this point, the spillable sub partition will write all in-memory
+ * buffers to disk. All added buffers after that point directly go to disk.
+ *
+ * <p>This partition type is used for {@link ResultPartitionType#BLOCKING}
+ * results that are fully produced before they can be consumed. At the point
+ * when they are consumed, the buffers are (i) all in-memory, (ii) currently
+ * being spilled to disk, or (iii) completely spilled to disk. Depending on
+ * this state, different reader variants are returned (see
+ * {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}).
+ *
+ * <p>Since the network buffer pool size is usually quite small (default is
+ * {@link ConfigConstants#DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS}), most
+ * spillable partitions will be spilled for real-world data sets.
*/
class SpillableSubpartition extends ResultSubpartition {
private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartition.class);
- /** All buffers of this subpartition. */
- final ArrayList<Buffer> buffers = new ArrayList<Buffer>();
+ /** Buffers are kept in this queue as long as we weren't ask to release any. */
+ private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
- /** The I/O manager to create the spill writer from. */
- final IOManager ioManager;
-
- /** The default I/O mode to use. */
- final IOMode ioMode;
+ /** The I/O manager used for spilling buffers to disk. */
+ private final IOManager ioManager;
/** The writer used for spilling. As long as this is null, we are in-memory. */
- BufferFileWriter spillWriter;
+ private BufferFileWriter spillWriter;
/** Flag indicating whether the subpartition has been finished. */
private boolean isFinished;
@@ -64,11 +76,10 @@ class SpillableSubpartition extends ResultSubpartition {
/** The read view to consume this subpartition. */
private ResultSubpartitionView readView;
- SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager, IOMode ioMode) {
+ SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager) {
super(index, parent);
this.ioManager = checkNotNull(ioManager);
- this.ioMode = checkNotNull(ioMode);
}
@Override
@@ -80,7 +91,6 @@ class SpillableSubpartition extends ResultSubpartition {
return false;
}
- // In-memory
if (spillWriter == null) {
buffers.add(buffer);
@@ -88,7 +98,7 @@ class SpillableSubpartition extends ResultSubpartition {
}
}
- // Else: Spilling
+ // Didn't return early => go to disk
spillWriter.writeBlock(buffer);
return true;
@@ -102,7 +112,7 @@ class SpillableSubpartition extends ResultSubpartition {
}
}
- // If we are spilling/have spilled, wait for the writer to finish.
+ // If we are spilling/have spilled, wait for the writer to finish
if (spillWriter != null) {
spillWriter.close();
}
@@ -117,51 +127,93 @@ class SpillableSubpartition extends ResultSubpartition {
return;
}
- // Recycle all in-memory buffers
- for (Buffer buffer : buffers) {
- buffer.recycle();
- }
-
- buffers.clear();
- buffers.trimToSize();
+ view = readView;
- // If we are spilling/have spilled, wait for the writer to finish and delete the file.
- if (spillWriter != null) {
- spillWriter.closeAndDelete();
+ // No consumer yet, we are responsible to clean everything up. If
+ // one is available, the view is responsible is to clean up (see
+ // below).
+ if (view == null) {
+ for (Buffer buffer : buffers) {
+ buffer.recycle();
+ }
+ buffers.clear();
+
+ // TODO This can block until all buffers are written out to
+ // disk if a spill is in-progress before deleting the file.
+ // It is possibly called from the Netty event loop threads,
+ // which can bring down the network.
+ if (spillWriter != null) {
+ spillWriter.closeAndDelete();
+ }
}
- // Get the view...
- view = readView;
- readView = null;
-
isReleased = true;
}
- // Release the view outside of the synchronized block
if (view != null) {
- view.notifySubpartitionConsumed();
+ view.releaseAllResources();
+ }
+ }
+
+ @Override
+ public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+ synchronized (buffers) {
+ if (!isFinished) {
+ throw new IllegalStateException("Subpartition has not been finished yet, " +
+ "but blocking subpartitions can only be consumed after they have " +
+ "been finished.");
+ }
+
+ if (readView != null) {
+ throw new IllegalStateException("Subpartition is being or already has been " +
+ "consumed, but we currently allow subpartitions to only be consumed once.");
+ }
+
+ if (spillWriter != null) {
+ readView = new SpilledSubpartitionView(
+ this,
+ bufferProvider.getMemorySegmentSize(),
+ spillWriter,
+ getTotalNumberOfBuffers(),
+ availabilityListener);
+ } else {
+ readView = new SpillableSubpartitionView(
+ this,
+ buffers,
+ ioManager,
+ bufferProvider.getMemorySegmentSize(),
+ availabilityListener);
+ }
+
+ return readView;
}
}
@Override
public int releaseMemory() throws IOException {
synchronized (buffers) {
- if (spillWriter == null) {
- // Create the spill writer
+ ResultSubpartitionView view = readView;
+
+ if (view != null && view.getClass() == SpillableSubpartitionView.class) {
+ // If there is a spilalble view, it's the responsibility of the
+ // view to release memory.
+ SpillableSubpartitionView spillableView = (SpillableSubpartitionView) view;
+ return spillableView.releaseMemory();
+ } else if (spillWriter == null) {
+ // No view and in-memory => spill to disk
spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
- final int numberOfBuffers = buffers.size();
-
+ int numberOfBuffers = buffers.size();
long spilledBytes = 0;
// Spill all buffers
for (int i = 0; i < numberOfBuffers; i++) {
- Buffer buffer = buffers.remove(0);
+ Buffer buffer = buffers.remove();
spilledBytes += buffer.getSize();
spillWriter.writeBlock(buffer);
}
- LOG.debug("Spilled {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId());
+ LOG.debug("Spilling {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId());
return numberOfBuffers;
}
@@ -177,47 +229,8 @@ class SpillableSubpartition extends ResultSubpartition {
}
@Override
- public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException {
- synchronized (buffers) {
- if (!isFinished) {
- throw new IllegalStateException("Subpartition has not been finished yet, " +
- "but blocking subpartitions can only be consumed after they have " +
- "been finished.");
- }
-
- if (readView != null) {
- throw new IllegalStateException("Subpartition is being or already has been " +
- "consumed, but we currently allow subpartitions to only be consumed once.");
- }
-
- // Spilled if closed and no outstanding write requests
- boolean isSpilled = spillWriter != null && (spillWriter.isClosed()
- || spillWriter.getNumberOfOutstandingRequests() == 0);
-
- if (isSpilled) {
- if (ioMode.isSynchronous()) {
- readView = new SpilledSubpartitionViewSyncIO(
- this,
- bufferProvider.getMemorySegmentSize(),
- spillWriter.getChannelID(),
- 0);
- }
- else {
- readView = new SpilledSubpartitionViewAsyncIO(
- this,
- bufferProvider,
- ioManager,
- spillWriter.getChannelID(),
- 0);
- }
- }
- else {
- readView = new SpillableSubpartitionView(
- this, bufferProvider, buffers.size(), ioMode);
- }
-
- return readView;
- }
+ public int getNumberOfQueuedBuffers() {
+ return buffers.size();
}
@Override
@@ -228,8 +241,4 @@ class SpillableSubpartition extends ResultSubpartition {
spillWriter != null);
}
- @Override
- public int getNumberOfQueuedBuffers() {
- return buffers.size();
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 29c2002..8119ecc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -18,15 +18,14 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.NotificationListener;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
class SpillableSubpartitionView implements ResultSubpartitionView {
@@ -34,146 +33,163 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
/** The subpartition this view belongs to. */
private final SpillableSubpartition parent;
- /** The buffer provider to read buffers into (spilling case). */
- private final BufferProvider bufferProvider;
+ /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
+ private final ArrayDeque<Buffer> buffers;
- /** The number of buffers in-memory at the subpartition. */
- private final int numberOfBuffers;
+ /** IO manager if we need to spill (for spilled case). */
+ private final IOManager ioManager;
- /** The default I/O mode to use. */
- private final IOMode ioMode;
+ /** Size of memory segments (for spilled case). */
+ private final int memorySegmentSize;
- private ResultSubpartitionView spilledView;
-
- private int currentQueuePosition;
-
- private long currentBytesRead;
+ /**
+ * The buffer availability listener. As long as in-memory, notifications
+ * happen on a buffer per buffer basis as spilling may happen after a
+ * notification has been sent out.
+ */
+ private final BufferAvailabilityListener listener;
private final AtomicBoolean isReleased = new AtomicBoolean(false);
- public SpillableSubpartitionView(
- SpillableSubpartition parent,
- BufferProvider bufferProvider,
- int numberOfBuffers,
- IOMode ioMode) {
-
- this.parent = checkNotNull(parent);
- this.bufferProvider = checkNotNull(bufferProvider);
- checkArgument(numberOfBuffers >= 0);
- this.numberOfBuffers = numberOfBuffers;
- this.ioMode = checkNotNull(ioMode);
- }
-
- @Override
- public Buffer getNextBuffer() throws IOException, InterruptedException {
-
- if (isReleased.get()) {
- return null;
- }
-
- // 1) In-memory
- synchronized (parent.buffers) {
- if (parent.isReleased()) {
- return null;
- }
+ /**
+ * The next buffer to hand out. Everytime this is set to a non-null value,
+ * a listener notification happens.
+ */
+ private Buffer nextBuffer;
- if (parent.spillWriter == null) {
- if (currentQueuePosition < numberOfBuffers) {
- Buffer buffer = parent.buffers.get(currentQueuePosition);
+ private volatile SpilledSubpartitionView spilledView;
- buffer.retain();
+ SpillableSubpartitionView(
+ SpillableSubpartition parent,
+ ArrayDeque<Buffer> buffers,
+ IOManager ioManager,
+ int memorySegmentSize,
+ BufferAvailabilityListener listener) {
- // TODO Fix hard coding of 8 bytes for the header
- currentBytesRead += buffer.getSize() + 8;
- currentQueuePosition++;
-
- return buffer;
- }
+ this.parent = checkNotNull(parent);
+ this.buffers = checkNotNull(buffers);
+ this.ioManager = checkNotNull(ioManager);
+ this.memorySegmentSize = memorySegmentSize;
+ this.listener = checkNotNull(listener);
- return null;
- }
+ synchronized (buffers) {
+ nextBuffer = buffers.poll();
}
- // 2) Spilled
- if (spilledView != null) {
- return spilledView.getNextBuffer();
+ if (nextBuffer != null) {
+ listener.notifyBuffersAvailable(1);
}
+ }
- // 3) Spilling
- // Make sure that all buffers are written before consuming them. We can't block here,
- // because this might be called from an network I/O thread.
- if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) {
- return null;
- }
+ int releaseMemory() throws IOException {
+ synchronized (buffers) {
+ if (spilledView != null || nextBuffer == null) {
+ // Already spilled or nothing in-memory
+ return 0;
+ } else {
+ // We don't touch next buffer, because a notification has
+ // already been sent for it. Only when it is consumed, will
+ // it be recycled.
+
+ // Create the spill writer and write all buffers to disk
+ BufferFileWriter spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
+
+ int numBuffers = buffers.size();
+ for (int i = 0; i < numBuffers; i++) {
+ Buffer buffer = buffers.remove();
+ try {
+ spillWriter.writeBlock(buffer);
+ } finally {
+ buffer.recycle();
+ }
+ }
- if (ioMode.isSynchronous()) {
- spilledView = new SpilledSubpartitionViewSyncIO(
- parent,
- bufferProvider.getMemorySegmentSize(),
- parent.spillWriter.getChannelID(),
- currentBytesRead);
- }
- else {
- spilledView = new SpilledSubpartitionViewAsyncIO(
+ spilledView = new SpilledSubpartitionView(
parent,
- bufferProvider,
- parent.ioManager,
- parent.spillWriter.getChannelID(),
- currentBytesRead);
- }
+ memorySegmentSize,
+ spillWriter,
+ numBuffers,
+ listener);
- return spilledView.getNextBuffer();
+ return numBuffers;
+ }
+ }
}
@Override
- public boolean registerListener(NotificationListener listener) throws IOException {
- if (spilledView == null) {
- synchronized (parent.buffers) {
- // Didn't spill yet, buffers should be in-memory
- if (parent.spillWriter == null) {
- return false;
+ public Buffer getNextBuffer() throws IOException, InterruptedException {
+ synchronized (buffers) {
+ if (isReleased.get()) {
+ return null;
+ } else if (nextBuffer != null) {
+ Buffer current = nextBuffer;
+ nextBuffer = buffers.poll();
+
+ if (nextBuffer != null) {
+ listener.notifyBuffersAvailable(1);
}
- }
- // Spilling
- if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) {
- return parent.spillWriter.registerAllRequestsProcessedListener(listener);
+ return current;
}
+ } // else: spilled
- return false;
+ SpilledSubpartitionView spilled = spilledView;
+ if (spilled != null) {
+ return spilled.getNextBuffer();
+ } else {
+ throw new IllegalStateException("No in-memory buffers available, but also nothing spilled.");
}
-
- return spilledView.registerListener(listener);
}
@Override
- public void notifySubpartitionConsumed() throws IOException {
- parent.onConsumedSubpartition();
+ public void notifyBuffersAvailable(long buffers) throws IOException {
+ // We do the availability listener notification one by one
}
@Override
public void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
- if (spilledView != null) {
- spilledView.releaseAllResources();
+ SpilledSubpartitionView spilled = spilledView;
+ if (spilled != null) {
+ spilled.releaseAllResources();
}
}
}
@Override
+ public void notifySubpartitionConsumed() throws IOException {
+ SpilledSubpartitionView spilled = spilledView;
+ if (spilled != null) {
+ spilled.notifySubpartitionConsumed();
+ } else {
+ parent.onConsumedSubpartition();
+ }
+ }
+
+ @Override
public boolean isReleased() {
- return parent.isReleased() || isReleased.get();
+ SpilledSubpartitionView spilled = spilledView;
+ if (spilled != null) {
+ return spilled.isReleased();
+ } else {
+ return parent.isReleased() || isReleased.get();
+ }
}
@Override
public Throwable getFailureCause() {
- return parent.getFailureCause();
+ SpilledSubpartitionView spilled = spilledView;
+ if (spilled != null) {
+ return spilled.getFailureCause();
+ } else {
+ return parent.getFailureCause();
+ }
}
@Override
public String toString() {
return String.format("SpillableSubpartitionView(index: %d) of ResultPartition %s",
- parent.index,
- parent.parent.getPartitionId());
+ parent.index,
+ parent.parent.getPartitionId());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
new file mode 100644
index 0000000..b087a4e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Reader for a spilled sub partition.
+ *
+ * <p>The partition availability listener is notified about available buffers
+ * only when the spilling is done. Spilling is done async and if it is still
+ * in progress, we wait with the notification until the spilling is done.
+ *
+ * <p>Reads of the spilled file are done in synchronously.
+ */
+class SpilledSubpartitionView implements ResultSubpartitionView, NotificationListener {
+
+ /** The subpartition this view belongs to. */
+ private final ResultSubpartition parent;
+
+ /** Writer for spills. */
+ private final BufferFileWriter spillWriter;
+
+ /** The synchronous file reader to do the actual I/O. */
+ private final BufferFileReader fileReader;
+
+ /** The buffer pool to read data into. */
+ private final SpillReadBufferPool bufferPool;
+
+ /** Buffer availability listener. */
+ private final BufferAvailabilityListener availabilityListener;
+
+ /** The total number of spilled buffers. */
+ private final long numberOfSpilledBuffers;
+
+ /** Flag indicating whether all resources have been released. */
+ private AtomicBoolean isReleased = new AtomicBoolean();
+
+ /** Flag indicating whether a spill is still in progress. */
+ private volatile boolean isSpillInProgress = true;
+
+ SpilledSubpartitionView(
+ ResultSubpartition parent,
+ int memorySegmentSize,
+ BufferFileWriter spillWriter,
+ long numberOfSpilledBuffers,
+ BufferAvailabilityListener availabilityListener) throws IOException {
+
+ this.parent = checkNotNull(parent);
+ this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
+ this.spillWriter = checkNotNull(spillWriter);
+ this.fileReader = new SynchronousBufferFileReader(spillWriter.getChannelID(), false);
+ checkArgument(numberOfSpilledBuffers >= 0);
+ this.numberOfSpilledBuffers = numberOfSpilledBuffers;
+ this.availabilityListener = checkNotNull(availabilityListener);
+
+ // Check whether async spilling is still in progress. If not, this returns
+ // false and we can notify our availability listener about all available buffers.
+ // Otherwise, we notify only when the spill writer callback happens.
+ if (!spillWriter.registerAllRequestsProcessedListener(this)) {
+ isSpillInProgress = false;
+ availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+ }
+ }
+
+ /**
+ * This is the call back method for the spill writer. If a spill is still
+ * in progress when this view is created we wait until this method is called
+ * before we notify the availability listener.
+ */
+ @Override
+ public void onNotification() {
+ isSpillInProgress = false;
+ availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+ }
+
+ @Override
+ public Buffer getNextBuffer() throws IOException, InterruptedException {
+ if (fileReader.hasReachedEndOfFile() || isSpillInProgress) {
+ return null;
+ }
+
+ // TODO This is fragile as we implicitly expect that multiple calls to
+ // this method don't happen before recycling buffers returned earlier.
+ Buffer buffer = bufferPool.requestBufferBlocking();
+ fileReader.readInto(buffer);
+
+ return buffer;
+ }
+
+ @Override
+ public void notifyBuffersAvailable(long buffers) throws IOException {
+ // We do the availability listener notification either directly on
+ // construction of this view (when everything has been spilled) or
+ // as soon as spilling is done and we are notified about it in the
+ // #onNotification callback.
+ }
+
+ @Override
+ public void notifySubpartitionConsumed() throws IOException {
+ parent.onConsumedSubpartition();
+ }
+
+ @Override
+ public void releaseAllResources() throws IOException {
+ if (isReleased.compareAndSet(false, true)) {
+ // TODO This can block until all buffers are written out to
+ // disk if a spill is in-progress before deleting the file.
+ // It is possibly called from the Netty event loop threads,
+ // which can bring down the network.
+ spillWriter.closeAndDelete();
+
+ fileReader.close();
+ bufferPool.destroy();
+ }
+ }
+
+ @Override
+ public boolean isReleased() {
+ return parent.isReleased() || isReleased.get();
+ }
+
+ @Override
+ public Throwable getFailureCause() {
+ return parent.getFailureCause();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SpilledSubpartitionView[sync](index: %d) of ResultPartition %s", parent.index, parent.parent.getPartitionId());
+ }
+
+ /**
+ * A buffer pool to provide buffer to read the file into.
+ *
+ * <p>This pool ensures that a consuming input gate makes progress in all cases, even when all
+ * buffers of the input gate buffer pool have been requested by remote input channels.
+ */
+ private static class SpillReadBufferPool implements BufferRecycler {
+
+ private final Queue<Buffer> buffers;
+
+ private boolean isDestroyed;
+
+ SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) {
+ this.buffers = new ArrayDeque<>(numberOfBuffers);
+
+ synchronized (buffers) {
+ for (int i = 0; i < numberOfBuffers; i++) {
+ buffers.add(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this));
+ }
+ }
+ }
+
+ @Override
+ public void recycle(MemorySegment memorySegment) {
+ synchronized (buffers) {
+ if (isDestroyed) {
+ memorySegment.free();
+ } else {
+ buffers.add(new Buffer(memorySegment, this));
+ buffers.notifyAll();
+ }
+ }
+ }
+
+ private Buffer requestBufferBlocking() throws InterruptedException {
+ synchronized (buffers) {
+ while (true) {
+ if (isDestroyed) {
+ return null;
+ }
+
+ Buffer buffer = buffers.poll();
+
+ if (buffer != null) {
+ return buffer;
+ }
+ // Else: wait for a buffer
+ buffers.wait();
+ }
+ }
+ }
+
+ private void destroy() {
+ synchronized (buffers) {
+ isDestroyed = true;
+ buffers.notifyAll();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
deleted file mode 100644
index ca25536..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * View over a spilled subpartition.
- *
- * <p> Reads are triggered asynchronously in batches of configurable size.
- */
-class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
-
- private final static int DEFAULT_READ_BATCH_SIZE = 2;
-
- private final Object lock = new Object();
-
- /** The subpartition this view belongs to. */
- private final ResultSubpartition parent;
-
- /** The buffer provider to get the buffer read everything into. */
- private final BufferProvider bufferProvider;
-
- /** The buffer availability listener to be notified on available buffers. */
- private final BufferProviderCallback bufferAvailabilityListener;
-
- /** The size of read batches. */
- private final int readBatchSize;
-
- /**
- * The size of the current batch (>= 0 and <= the configured batch size). Reads are only
- * triggered when the size of the current batch is 0.
- */
- private final AtomicInteger currentBatchSize = new AtomicInteger();
-
- /** The asynchronous file reader to do the actual I/O. */
- private final BufferFileReader asyncFileReader;
-
- /** The buffers, which have been returned from the file reader. */
- private final ConcurrentLinkedQueue<Buffer> returnedBuffers = new ConcurrentLinkedQueue<Buffer>();
-
- /** A data availability listener. */
- private final AtomicReference<NotificationListener> registeredListener;
-
- /** Error, which has occurred in the I/O thread. */
- private volatile IOException errorInIOThread;
-
- /** Flag indicating whether all resources have been released. */
- private volatile boolean isReleased;
-
- /** Flag indicating whether we reached EOF at the file reader. */
- private volatile boolean hasReachedEndOfFile;
-
- /** Spilled file size */
- private final long fileSize;
-
- SpilledSubpartitionViewAsyncIO(
- ResultSubpartition parent,
- BufferProvider bufferProvider,
- IOManager ioManager,
- FileIOChannel.ID channelId,
- long initialSeekPosition) throws IOException {
-
- this(parent, bufferProvider, ioManager, channelId, initialSeekPosition, DEFAULT_READ_BATCH_SIZE);
- }
-
- SpilledSubpartitionViewAsyncIO(
- ResultSubpartition parent,
- BufferProvider bufferProvider,
- IOManager ioManager,
- FileIOChannel.ID channelId,
- long initialSeekPosition,
- int readBatchSize) throws IOException {
-
- checkArgument(initialSeekPosition >= 0, "Initial seek position is < 0.");
- checkArgument(readBatchSize >= 1, "Batch read size < 1.");
-
- this.parent = checkNotNull(parent);
- this.bufferProvider = checkNotNull(bufferProvider);
- this.bufferAvailabilityListener = new BufferProviderCallback(this);
- this.registeredListener = new AtomicReference<>();
-
- this.asyncFileReader = ioManager.createBufferFileReader(channelId, new IOThreadCallback(this));
-
- if (initialSeekPosition > 0) {
- asyncFileReader.seekToPosition(initialSeekPosition);
- }
-
- this.readBatchSize = readBatchSize;
-
- this.fileSize = asyncFileReader.getSize();
-
- // Trigger the initial read requests
- readNextBatchAsync();
- }
-
- @Override
- public Buffer getNextBuffer() throws IOException {
- checkError();
-
- final Buffer buffer = returnedBuffers.poll();
-
- // No buffer returned from the I/O thread currently. Either the current batch is in progress
- // or we trigger the next one.
- if (buffer == null) {
- if (currentBatchSize.get() == 0) {
- readNextBatchAsync();
- }
- }
- else {
- currentBatchSize.decrementAndGet();
- }
-
- return buffer;
- }
-
- @Override
- public boolean registerListener(NotificationListener listener) throws IOException {
- checkNotNull(listener);
-
- checkError();
-
- synchronized (lock) {
- if (isReleased || !returnedBuffers.isEmpty()) {
- return false;
- }
-
- if (registeredListener.compareAndSet(null, listener)) {
- return true;
- } else {
- throw new IllegalStateException("already registered listener");
- }
- }
- }
-
- @Override
- public void notifySubpartitionConsumed() throws IOException {
- parent.onConsumedSubpartition();
- }
-
- @Override
- public void releaseAllResources() throws IOException {
- try {
- synchronized (lock) {
- if (!isReleased) {
- // Recycle all buffers. Buffers, which are in flight are recycled as soon as
- // they return from the I/O thread.
- Buffer buffer;
- while ((buffer = returnedBuffers.poll()) != null) {
- buffer.recycle();
- }
-
- isReleased = true;
- }
- }
- }
- finally {
- asyncFileReader.close();
- }
- }
-
- @Override
- public boolean isReleased() {
- return parent.isReleased() || isReleased;
- }
-
- @Override
- public Throwable getFailureCause() {
- return parent.getFailureCause();
- }
-
- /**
- * Requests buffers from the buffer provider and triggers asynchronous read requests to fill
- * them.
- *
- * <p> The number of requested buffers/triggered I/O read requests per call depends on the
- * configured size of batch reads.
- */
- private void readNextBatchAsync() throws IOException {
- // This does not need to be fully synchronized with actually reaching EOF as long as
- // we eventually notice it. In the worst case, we trigger some discarded reads and
- // notice it when the buffers are returned.
- //
- // We only trigger reads if the current batch size is 0.
- if (hasReachedEndOfFile || currentBatchSize.get() != 0) {
- return;
- }
-
- // Number of successful buffer requests or callback registrations. The call back will
- // trigger the read as soon as a buffer becomes available again.
- int i = 0;
-
- while (i < readBatchSize) {
- final Buffer buffer = bufferProvider.requestBuffer();
-
- if (buffer == null) {
- // Listen for buffer availability.
- currentBatchSize.incrementAndGet();
-
- if (bufferProvider.addListener(bufferAvailabilityListener)) {
- i++;
- }
- else if (bufferProvider.isDestroyed()) {
- currentBatchSize.decrementAndGet();
- return;
- }
- else {
- // Buffer available again
- currentBatchSize.decrementAndGet();
- }
- }
- else {
- currentBatchSize.incrementAndGet();
-
- asyncFileReader.readInto(buffer);
- }
- }
- }
-
- /**
- * Returns a buffer from the buffer provider.
- *
- * <p> Note: This method is called from the thread recycling the available buffer.
- */
- private void onAvailableBuffer(Buffer buffer) {
- try {
- asyncFileReader.readInto(buffer);
- }
- catch (IOException e) {
- notifyError(e);
- }
- }
-
- /**
- * Returns a successful buffer read request.
- *
- * <p> Note: This method is always called from the same I/O thread.
- */
- private void returnBufferFromIOThread(Buffer buffer) {
- final NotificationListener listener;
-
- synchronized (lock) {
- if (hasReachedEndOfFile || isReleased) {
- buffer.recycle();
-
- return;
- }
-
- returnedBuffers.add(buffer);
-
- // after this, the listener should be null
- listener = registeredListener.getAndSet(null);
-
- // If this was the last buffer before we reached EOF, set the corresponding flag to
- // ensure that further buffers are correctly recycled and eventually no further reads
- // are triggered.
- if (asyncFileReader.hasReachedEndOfFile()) {
- hasReachedEndOfFile = true;
- }
- }
-
- if (listener != null) {
- listener.onNotification();
- }
- }
-
- /**
- * Notifies the view about an error.
- */
- private void notifyError(IOException error) {
- if (errorInIOThread == null) {
- errorInIOThread = error;
- }
-
- final NotificationListener listener = registeredListener.getAndSet(null);
- if (listener != null) {
- listener.onNotification();
- }
- }
-
- /**
- * Checks whether an error has been reported and rethrow the respective Exception, if available.
- */
- private void checkError() throws IOException {
- if (errorInIOThread != null) {
- throw errorInIOThread;
- }
- }
-
- /**
- * Callback from the I/O thread.
- *
- * <p> Successful buffer read requests add the buffer to the subpartition view, and failed ones
- * notify about the error.
- */
- private static class IOThreadCallback implements RequestDoneCallback<Buffer> {
-
- private final SpilledSubpartitionViewAsyncIO subpartitionView;
-
- public IOThreadCallback(SpilledSubpartitionViewAsyncIO subpartitionView) {
- this.subpartitionView = subpartitionView;
- }
-
- @Override
- public void requestSuccessful(Buffer buffer) {
- subpartitionView.returnBufferFromIOThread(buffer);
- }
-
- @Override
- public void requestFailed(Buffer buffer, IOException error) {
- // Recycle the buffer and forward the error
- buffer.recycle();
-
- subpartitionView.notifyError(error);
- }
- }
-
- @Override
- public String toString() {
- return String.format("SpilledSubpartitionView[async](index: %d, file size: %d bytes) of ResultPartition %s",
- parent.index,
- fileSize,
- parent.parent.getPartitionId());
- }
-
- /**
- * Callback from the buffer provider.
- */
- private static class BufferProviderCallback implements EventListener<Buffer> {
-
- private final SpilledSubpartitionViewAsyncIO subpartitionView;
-
- private BufferProviderCallback(SpilledSubpartitionViewAsyncIO subpartitionView) {
- this.subpartitionView = subpartitionView;
- }
-
- @Override
- public void onEvent(Buffer buffer) {
- if (buffer == null) {
- return;
- }
-
- subpartitionView.onAvailableBuffer(buffer);
- }
- }
-}