You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/11 11:48:50 UTC
[3/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager
Readers and Writers to interfaces,
add implementation that uses callbacks on completed write requests.
[FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.
- This change also allows for a very simple way of plugging in a synchronous version of the I/O manager.
This closes #193.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c9cfe3ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c9cfe3ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c9cfe3ba
Branch: refs/heads/master
Commit: c9cfe3ba9a2009c3da2cb8a39090154c30ccd88c
Parents: 8e4c772
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 7 18:45:19 2014 +0100
Committer: uce <uc...@apache.org>
Committed: Tue Nov 11 11:48:14 2014 +0100
----------------------------------------------------------------------
.../io/disk/ChannelReaderInputViewIterator.java | 6 +-
.../disk/iomanager/AbstractFileIOChannel.java | 106 ++++
.../disk/iomanager/AsynchronousBlockReader.java | 131 +++++
.../disk/iomanager/AsynchronousBlockWriter.java | 88 +++
.../AsynchronousBlockWriterWithCallback.java | 67 +++
.../iomanager/AsynchronousBulkBlockReader.java | 107 ++++
.../iomanager/AsynchronousFileIOChannel.java | 264 +++++++++
.../io/disk/iomanager/BlockChannelAccess.java | 272 ---------
.../io/disk/iomanager/BlockChannelReader.java | 87 +--
.../io/disk/iomanager/BlockChannelWriter.java | 89 +--
.../BlockChannelWriterWithCallback.java | 35 ++
.../disk/iomanager/BulkBlockChannelReader.java | 59 +-
.../runtime/io/disk/iomanager/Channel.java | 109 ----
.../io/disk/iomanager/ChannelAccess.java | 172 ------
.../disk/iomanager/ChannelReaderInputView.java | 3 +-
.../disk/iomanager/ChannelWriterOutputView.java | 3 +-
.../io/disk/iomanager/FileIOChannel.java | 156 ++++++
.../runtime/io/disk/iomanager/IOManager.java | 547 ++-----------------
.../io/disk/iomanager/IOManagerAsync.java | 449 +++++++++++++++
.../runtime/io/disk/iomanager/IORequest.java | 18 +-
.../io/disk/iomanager/QueuingCallback.java | 47 ++
.../io/disk/iomanager/RequestDoneCallback.java | 36 ++
.../runtime/io/disk/iomanager/RequestQueue.java | 22 +-
.../iomanager/SynchronousFileIOChannel.java | 45 ++
.../iterative/io/SerializedUpdateBuffer.java | 4 +-
.../runtime/operators/hash/HashPartition.java | 8 +-
.../operators/hash/MutableHashTable.java | 4 +-
.../operators/hash/ReOpenableHashPartition.java | 6 +-
.../hash/ReOpenableMutableHashTable.java | 4 +-
.../sort/CombiningUnilateralSortMerger.java | 22 +-
.../operators/sort/UnilateralSortMerger.java | 57 +-
.../flink/runtime/taskmanager/TaskManager.java | 3 +-
.../runtime/util/EnvironmentInformation.java | 8 +
.../apache/flink/runtime/blob/BlobKeyTest.java | 2 +-
.../flink/runtime/io/disk/ChannelViewsTest.java | 18 +-
.../runtime/io/disk/SpillingBufferTest.java | 4 +-
.../io/disk/iomanager/IOManagerITCase.java | 4 +-
.../IOManagerPerformanceBenchmark.java | 17 +-
.../io/disk/iomanager/IOManagerTest.java | 65 +--
.../flink/runtime/operators/CrossTaskTest.java | 7 +-
.../operators/MatchTaskExternalITCase.java | 5 +-
.../flink/runtime/operators/MatchTaskTest.java | 5 +-
.../operators/hash/HashMatchIteratorITCase.java | 8 +-
.../runtime/operators/hash/HashTableITCase.java | 9 +-
.../hash/HashTablePerformanceComparison.java | 3 +-
.../hash/ReOpenableHashTableITCase.java | 5 +-
.../SpillingResettableIteratorTest.java | 3 +-
...lingResettableMutableObjectIteratorTest.java | 3 +-
.../CombiningUnilateralSortMergerITCase.java | 3 +-
.../operators/sort/ExternalSortITCase.java | 3 +-
.../sort/MassiveStringSortingITCase.java | 5 +-
.../sort/SortMergeMatchIteratorITCase.java | 11 +-
.../operators/testutils/DriverTestBase.java | 3 +-
.../operators/testutils/MockEnvironment.java | 3 +-
.../operators/util/HashVsSortMiniBenchmark.java | 5 +-
55 files changed, 1776 insertions(+), 1449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
index 7eaf635..f38aa25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
@@ -27,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.util.MutableObjectIterator;
@@ -46,14 +46,14 @@ public class ChannelReaderInputViewIterator<E> implements MutableObjectIterator<
private final List<MemorySegment> freeMemTarget;
- public ChannelReaderInputViewIterator(IOManager ioAccess, Channel.ID channel, List<MemorySegment> segments,
+ public ChannelReaderInputViewIterator(IOManager ioAccess, FileIOChannel.ID channel, List<MemorySegment> segments,
List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
throws IOException
{
this(ioAccess, channel, new LinkedBlockingQueue<MemorySegment>(), segments, freeMemTarget, accessors, numBlocks);
}
- public ChannelReaderInputViewIterator(IOManager ioAccess, Channel.ID channel, LinkedBlockingQueue<MemorySegment> returnQueue,
+ public ChannelReaderInputViewIterator(IOManager ioAccess, FileIOChannel.ID channel, LinkedBlockingQueue<MemorySegment> returnQueue,
List<MemorySegment> segments, List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
new file mode 100644
index 0000000..ecb794e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractFileIOChannel implements FileIOChannel {
+
+ /** Logger object for channel and its subclasses */
+ protected static final Logger LOG = LoggerFactory.getLogger(FileIOChannel.class);
+
+ /** The ID of the underlying channel. */
+ protected final FileIOChannel.ID id;
+
+ /** A file channel for NIO access to the file. */
+ protected final FileChannel fileChannel;
+
+
+ /**
+ * Creates a new channel to the path indicated by the given ID. The channel hands IO requests to
+ * the given request queue to be processed.
+ *
+ * @param channelID The id describing the path of the file that the channel accessed.
+ * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
+ * than in read-only mode.
+ * @throws IOException Thrown, if the channel could no be opened.
+ */
+ protected AbstractFileIOChannel(FileIOChannel.ID channelID, boolean writeEnabled) throws IOException {
+ this.id = Preconditions.checkNotNull(channelID);
+
+ try {
+ @SuppressWarnings("resource")
+ RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
+ this.fileChannel = file.getChannel();
+ }
+ catch (IOException e) {
+ throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the channel ID of this channel.
+ *
+ * @return This channel's ID.
+ */
+ @Override
+ public final FileIOChannel.ID getChannelID() {
+ return this.id;
+ }
+
+ @Override
+ public abstract boolean isClosed();
+
+ @Override
+ public abstract void close() throws IOException;
+
+ @Override
+ public void deleteChannel() {
+ if (!isClosed() || this.fileChannel.isOpen()) {
+ throw new IllegalStateException("Cannot delete a channel that is open.");
+ }
+
+ // make a best effort to delete the file. Don't report exceptions.
+ try {
+ File f = new File(this.id.getPath());
+ if (f.exists()) {
+ f.delete();
+ }
+ } catch (Throwable t) {}
+ }
+
+ @Override
+ public void closeAndDelete() throws IOException {
+ try {
+ close();
+ } finally {
+ deleteChannel();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
new file mode 100644
index 0000000..35273f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A reader that reads data in blocks from a file channel. The reader reads the blocks into a
+ * {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
+ * request is not processed by the thread that issues it, but by an asynchronous reader thread. Once the read request
+ * is done, the asynchronous reader adds the full MemorySegment to a <i>return queue</i> where it can be popped by the
+ * worker thread, once it needs the data. The return queue is in this case a
+ * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
+ * if the request is still pending when the it requires the data.
+ * <p>
+ * Typical pre-fetching reads are done by issuing the read requests early and popping the return queue once the data
+ * is actually needed.
+ * <p>
+ * The reader has no notion whether the size of the memory segments is actually the size of the blocks on disk,
+ * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
+ * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.
+ */
+public class AsynchronousBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BlockChannelReader {
+
+ private final LinkedBlockingQueue<MemorySegment> returnSegments;
+
+ /**
+ * Creates a new block channel reader for the given channel.
+ *
+ * @param channelID The ID of the channel to read.
+ * @param requestQueue The request queue of the asynchronous reader thread, to which the I/O requests
+ * are added.
+ * @param returnSegments The return queue, to which the full Memory Segments are added.
+ * @throws IOException Thrown, if the underlying file channel could not be opened.
+ */
+ protected AsynchronousBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue,
+ LinkedBlockingQueue<MemorySegment> returnSegments)
+ throws IOException
+ {
+ super(channelID, requestQueue, new QueuingCallback(returnSegments), false);
+ this.returnSegments = returnSegments;
+ }
+
+ /**
+ * Issues a read request, which will asynchronously fill the given segment with the next block in the
+ * underlying file channel. Once the read request is fulfilled, the segment will be added to this reader's
+ * return queue.
+ *
+ * @param segment The segment to read the block into.
+ * @throws IOException Thrown, when the reader encounters an I/O error. Due to the asynchronous nature of the
+ * reader, the exception thrown here may have been caused by an earlier read request.
+ */
+ @Override
+ public void readBlock(MemorySegment segment) throws IOException {
+ // check the error state of this channel
+ checkErroneous();
+
+ // write the current buffer and get the next one
+ // the statements have to be in this order to avoid incrementing the counter
+ // after the channel has been closed
+ this.requestsNotReturned.incrementAndGet();
+ if (this.closed || this.requestQueue.isClosed()) {
+ // if we found ourselves closed after the counter increment,
+ // decrement the counter again and do not forward the request
+ this.requestsNotReturned.decrementAndGet();
+ throw new IOException("The reader has been closed.");
+ }
+ this.requestQueue.add(new SegmentReadRequest(this, segment));
+ }
+
+ /**
+ * Gets the next memory segment that has been filled with data by the reader. This method blocks until
+ * such a segment is available, or until an error occurs in the reader, or the reader is closed.
+ * <p>
+ * WARNING: If this method is invoked without any segment ever returning (for example, because the
+ * {@link #readBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+ * forever.
+ *
+ * @return The next memory segment from the reader's return queue.
+ * @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
+ */
+ @Override
+ public MemorySegment getNextReturnedSegment() throws IOException {
+ try {
+ while (true) {
+ final MemorySegment next = this.returnSegments.poll(1000, TimeUnit.MILLISECONDS);
+ if (next != null) {
+ return next;
+ } else {
+ if (this.closed) {
+ throw new IOException("The reader has been asynchronously closed.");
+ }
+ checkErroneous();
+ }
+ }
+ } catch (InterruptedException iex) {
+ throw new IOException("Reader was interrupted while waiting for the next returning segment.");
+ }
+ }
+
+ /**
+ * Gets the queue in which the full memory segments are queued after the asynchronous read
+ * is complete.
+ *
+ * @return The queue with the full memory segments.
+ */
+ @Override
+ public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+ return this.returnSegments;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
new file mode 100644
index 0000000..7e1681f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback implements BlockChannelWriter {
+
+ private final LinkedBlockingQueue<MemorySegment> returnSegments;
+
+ /**
+ * Creates a new block channel writer for the given channel.
+ *
+ * @param channelID The ID of the channel to write to.
+ * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests
+ * are added.
+ * @param returnSegments The return queue, to which the processed Memory Segments are added.
+ * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
+ */
+ protected AsynchronousBlockWriter(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue,
+ LinkedBlockingQueue<MemorySegment> returnSegments)
+ throws IOException
+ {
+ super(channelID, requestQueue, new QueuingCallback(returnSegments));
+ this.returnSegments = returnSegments;
+ }
+
+ /**
+ * Gets the next memory segment that has been written and is available again.
+ * This method blocks until such a segment is available, or until an error occurs in the writer, or the
+ * writer is closed.
+ * <p>
+ * NOTE: If this method is invoked without any segment ever returning (for example, because the
+ * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
+ * forever.
+ *
+ * @return The next memory segment from the writers's return queue.
+ * @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
+ */
+ @Override
+ public MemorySegment getNextReturnedSegment() throws IOException {
+ try {
+ while (true) {
+ final MemorySegment next = returnSegments.poll(1000, TimeUnit.MILLISECONDS);
+ if (next != null) {
+ return next;
+ } else {
+ if (this.closed) {
+ throw new IOException("The writer has been closed.");
+ }
+ checkErroneous();
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Writer was interrupted while waiting for the next returning segment.");
+ }
+ }
+
+ /**
+ * Gets the queue in which the memory segments are queued after the asynchronous write is completed.
+ *
+ * @return The queue with the written memory segments.
+ */
+ @Override
+ public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+ return this.returnSegments;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
new file mode 100644
index 0000000..6b6fb36
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * An asynchronous implementation of the {@link BlockChannelWriterWithCallback} that queues I/O requests
+ * and calls a callback once they have been handled.
+ */
+public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<WriteRequest> implements BlockChannelWriterWithCallback {
+
+ /**
+ * Creates a new asynchronous block writer for the given channel.
+ *
+ * @param channelID The ID of the channel to write to.
+ * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests are added.
+ * @param callback The callback to be invoked when requests are done.
+ * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
+ */
+ protected AsynchronousBlockWriterWithCallback(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue,
+ RequestDoneCallback callback) throws IOException
+ {
+ super(channelID, requestQueue, callback, true);
+ }
+
+ /**
+ * Issues a asynchronous write request to the writer.
+ *
+ * @param segment The segment to be written.
+ * @throws IOException Thrown, when the writer encounters an I/O error. Due to the asynchronous nature of the
+ * writer, the exception thrown here may have been caused by an earlier write request.
+ */
+ @Override
+ public void writeBlock(MemorySegment segment) throws IOException {
+ // check the error state of this channel
+ checkErroneous();
+
+ // write the current buffer and get the next one
+ this.requestsNotReturned.incrementAndGet();
+ if (this.closed || this.requestQueue.isClosed()) {
+ // if we found ourselves closed after the counter increment,
+ // decrement the counter again and do not forward the request
+ this.requestsNotReturned.decrementAndGet();
+ throw new IOException("The writer has been closed.");
+ }
+ this.requestQueue.add(new SegmentWriteRequest(this, segment));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
new file mode 100644
index 0000000..048f82f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ *
+ */
+public class AsynchronousBulkBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BulkBlockChannelReader {
+
+ private final ArrayList<MemorySegment> returnBuffers;
+
+
+ protected AsynchronousBulkBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue,
+ List<MemorySegment> sourceSegments, int numBlocks)
+ throws IOException
+ {
+ this (channelID, requestQueue, sourceSegments, numBlocks, new ArrayList<MemorySegment>(numBlocks));
+ }
+
+ private AsynchronousBulkBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue,
+ List<MemorySegment> sourceSegments, int numBlocks, ArrayList<MemorySegment> target)
+ throws IOException
+ {
+ super(channelID, requestQueue, new CollectingCallback(target), false);
+ this.returnBuffers = target;
+
+ // sanity check
+ if (sourceSegments.size() < numBlocks) {
+ throw new IllegalArgumentException("The list of source memory segments must contain at least" +
+ " as many segments as the number of blocks to read.");
+ }
+
+ // send read requests for all blocks
+ for (int i = 0; i < numBlocks; i++) {
+ readBlock(sourceSegments.remove(sourceSegments.size() - 1));
+ }
+ }
+
+ private void readBlock(MemorySegment segment) throws IOException {
+ // check the error state of this channel
+ checkErroneous();
+
+ // write the current buffer and get the next one
+ this.requestsNotReturned.incrementAndGet();
+ if (this.closed || this.requestQueue.isClosed()) {
+ // if we found ourselves closed after the counter increment,
+ // decrement the counter again and do not forward the request
+ this.requestsNotReturned.decrementAndGet();
+ throw new IOException("The reader has been closed.");
+ }
+ this.requestQueue.add(new SegmentReadRequest(this, segment));
+ }
+
+ @Override
+ public List<MemorySegment> getFullSegments() {
+ synchronized (this.closeLock) {
+ if (!this.isClosed() || this.requestsNotReturned.get() > 0) {
+ throw new IllegalStateException("Full segments can only be obtained after the reader was properly closed.");
+ }
+ }
+
+ return this.returnBuffers;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class CollectingCallback implements RequestDoneCallback {
+
+ private final ArrayList<MemorySegment> list;
+
+ public CollectingCallback(ArrayList<MemorySegment> list) {
+ this.list = list;
+ }
+
+ @Override
+ public void requestSuccessful(MemorySegment buffer) {
+ list.add(buffer);
+ }
+
+ @Override
+ public void requestFailed(MemorySegment buffer, IOException e) {
+ list.add(buffer);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
new file mode 100644
index 0000000..098b334
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A base class for readers and writers that accept read or write requests for whole blocks.
+ * The request is delegated to an asynchronous I/O thread. After completion of the I/O request, the memory
+ * segment of the block is added to a collection to be returned.
+ * <p>
+ * The asynchrony of the access makes it possible to implement read-ahead or write-behind types of I/O accesses.
+ *
+ * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to the I/O threads.
+ */
+public abstract class AsynchronousFileIOChannel<R extends IORequest> extends AbstractFileIOChannel {
+
+ /** The lock that is used during closing to synchronize the thread that waits for all
+ * requests to be handled with the asynchronous I/O thread. */
+ protected final Object closeLock = new Object();
+
+ /** A request queue for submitting asynchronous requests to the corresponding IO worker thread. */
+ protected final RequestQueue<R> requestQueue;
+
+ /** An atomic integer that counts the number of requests that we still wait for to return. */
+ protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
+
+ /** Hander for completed requests */
+ protected final RequestDoneCallback resultHander;
+
+ /** An exception that was encountered by the asynchronous request handling thread.*/
+ protected volatile IOException exception;
+
+ /** Flag marking this channel as closed */
+ protected volatile boolean closed;
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
+ * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers
+ * are returned by adding the to the given queue.
+ *
+ * @param channelID The id describing the path of the file that the channel accessed.
+ * @param requestQueue The queue that this channel hands its IO requests to.
+ * @param callback The callback to be invoked when a request is done.
+ * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
+ * than in read-only mode.
+ * @throws IOException Thrown, if the channel could no be opened.
+ */
+ protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, RequestQueue<R> requestQueue,
+ RequestDoneCallback callback, boolean writeEnabled) throws IOException
+ {
+ super(channelID, writeEnabled);
+
+ if (requestQueue == null) {
+ throw new NullPointerException();
+ }
+
+ this.requestQueue = requestQueue;
+ this.resultHander = callback;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean isClosed() {
+ return this.closed;
+ }
+
+ /**
+ * Closes the reader and waits until all pending asynchronous requests are
+ * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+ *
+ * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
+ * the closing was interrupted.
+ */
+ public void close() throws IOException {
+ // atomically set the close flag
+ synchronized (this.closeLock) {
+ if (this.closed) {
+ return;
+ }
+ this.closed = true;
+
+ try {
+ // wait until as many buffers have been returned as were written
+ // only then is everything guaranteed to be consistent.{
+ while (this.requestsNotReturned.get() > 0) {
+ try {
+ // we add a timeout here, because it is not guaranteed that the
+ // decrementing during buffer return and the check here are deadlock free.
+ // the deadlock situation is however unlikely and caught by the timeout
+ this.closeLock.wait(1000);
+ checkErroneous();
+ }
+ catch (InterruptedException iex) {}
+ }
+ }
+ finally {
+ // close the file
+ if (this.fileChannel.isOpen()) {
+ this.fileChannel.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * This method waits for all pending asynchronous requests to return. When the
+ * last request has returned, the channel is closed and deleted.
+ * <p>
+ * Even if an exception interrupts the closing, such that not all request are handled,
+ * the underlying <tt>FileChannel</tt> is closed and deleted.
+ *
+ * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if the closing was interrupted.
+ */
+ public void closeAndDelete() throws IOException {
+ try {
+ close();
+ }
+ finally {
+ deleteChannel();
+ }
+ }
+
+ /**
+ * Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
+ * be processed correctly.
+ *
+ * @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
+ * that defined the erroneous state as its cause.
+ */
+ public final void checkErroneous() throws IOException {
+ if (this.exception != null) {
+ throw this.exception;
+ }
+ }
+
+ /**
+ * Handles a processed <tt>Buffer</tt>. This method is invoked by the
+ * asynchronous IO worker threads upon completion of the IO request with the
+ * provided buffer and/or an exception that occurred while processing the request
+ * for that buffer.
+ *
+ * @param buffer The buffer to be processed.
+ * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
+ */
+ final void handleProcessedBuffer(MemorySegment buffer, IOException ex) {
+ // even if the callbacks throw an error, we need to maintain our bookkeeping
+ try {
+ if (ex != null && this.exception == null) {
+ this.exception = ex;
+ this.resultHander.requestFailed(buffer, ex);
+ }
+ else {
+ this.resultHander.requestSuccessful(buffer);
+ }
+ }
+ finally {
+ // decrement the number of missing buffers. If we are currently closing, notify the
+ if (this.closed) {
+ synchronized (this.closeLock) {
+ int num = this.requestsNotReturned.decrementAndGet();
+ if (num == 0) {
+ this.closeLock.notifyAll();
+ }
+ }
+ }
+ else {
+ this.requestsNotReturned.decrementAndGet();
+ }
+ }
+ }
+}
+
+//--------------------------------------------------------------------------------------------
+
+/**
+ * Read request that reads an entire memory segment from a block reader.
+ */
+final class SegmentReadRequest implements ReadRequest {
+
+ private final AsynchronousFileIOChannel<ReadRequest> channel;
+
+ private final MemorySegment segment;
+
+ protected SegmentReadRequest(AsynchronousFileIOChannel<ReadRequest> targetChannel, MemorySegment segment) {
+ this.channel = targetChannel;
+ this.segment = segment;
+ }
+
+ @Override
+ public void read() throws IOException {
+ final FileChannel c = this.channel.fileChannel;
+ if (c.size() - c.position() > 0) {
+ try {
+ final ByteBuffer wrapper = this.segment.wrap(0, this.segment.size());
+ this.channel.fileChannel.read(wrapper);
+ }
+ catch (NullPointerException npex) {
+ throw new IOException("Memory segment has been released.");
+ }
+ }
+ }
+
+ @Override
+ public void requestDone(IOException ioex) {
+ this.channel.handleProcessedBuffer(this.segment, ioex);
+ }
+}
+
+//--------------------------------------------------------------------------------------------
+
+/**
+ * Write request that writes an entire memory segment to the block writer.
+ */
+final class SegmentWriteRequest implements WriteRequest {
+
+ private final AsynchronousFileIOChannel<WriteRequest> channel;
+
+ private final MemorySegment segment;
+
+ protected SegmentWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) {
+ this.channel = targetChannel;
+ this.segment = segment;
+ }
+
+ @Override
+ public void write() throws IOException {
+ try {
+ this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size()));
+ }
+ catch (NullPointerException npex) {
+ throw new IOException("Memory segment has been released.");
+ }
+ }
+
+ @Override
+ public void requestDone(IOException ioex) {
+ this.channel.handleProcessedBuffer(this.segment, ioex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
deleted file mode 100644
index f19586d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-
-/**
- * A base class for readers and writers that accept read or write requests for whole blocks.
- * The request is delegated to an asynchronous I/O thread. After completion of the I/O request, the memory
- * segment of the block is added to a collection to be returned.
- * <p>
- * The asynchrony of the access makes it possible to implement read-ahead or write-behind types of I/O accesses.
- *
- *
- * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to
- * the I/O threads.
- * @param <C> The type of collection used to collect the segments from completed requests. Those segments are for
- * example for write requests the written and reusable segments, and for read requests the now full
- * and usable segments. The collection type may for example be a synchronized queue or an unsynchronized
- * list.
- */
-public abstract class BlockChannelAccess<R extends IORequest, C extends Collection<MemorySegment>> extends ChannelAccess<MemorySegment, R>
-{
- /**
- * The lock that is used during closing to synchronize the thread that waits for all
- * requests to be handled with the asynchronous I/O thread.
- */
- protected final Object closeLock = new Object();
-
- /**
- * An atomic integer that counts the number of buffers we still wait for to return.
- */
- protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
-
- /**
- * The collection gathering the processed buffers that are ready to be (re)used.
- */
- protected final C returnBuffers;
-
- /**
- * Flag marking this channel as closed;
- */
- protected volatile boolean closed;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
- * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers
- * are returned by adding the to the given queue.
- *
- * @param channelID The id describing the path of the file that the channel accessed.
- * @param requestQueue The queue that this channel hands its IO requests to.
- * @param returnQueue The queue to which the segments are added after their buffer was written.
- * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
- * than in read-only mode.
- * @throws IOException Thrown, if the channel could no be opened.
- */
- protected BlockChannelAccess(Channel.ID channelID, RequestQueue<R> requestQueue,
- C returnQueue, boolean writeEnabled)
- throws IOException
- {
- super(channelID, requestQueue, writeEnabled);
-
- if (requestQueue == null) {
- throw new NullPointerException();
- }
-
- this.returnBuffers = returnQueue;
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the queue (or list) to which the asynchronous reader adds its elements.
- *
- * @return The queue (or list) to which the asynchronous reader adds its elements.
- */
- public C getReturnQueue()
- {
- return this.returnBuffers;
- }
-
-
- @Override
- public boolean isClosed()
- {
- return this.closed;
- }
-
- /**
- * Closes the reader and waits until all pending asynchronous requests are
- * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is
- * closed.
- *
- * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
- * the closing was interrupted.
- */
- public void close() throws IOException
- {
- // atomically set the close flag
- synchronized (this.closeLock) {
- if (this.closed) {
- return;
- }
- this.closed = true;
-
- try {
- // wait until as many buffers have been returned as were written
- // only then is everything guaranteed to be consistent.{
- while (this.requestsNotReturned.get() > 0) {
- try {
- // we add a timeout here, because it is not guaranteed that the
- // decrementing during buffer return and the check here are deadlock free.
- // the deadlock situation is however unlikely and caught by the timeout
- this.closeLock.wait(1000);
- checkErroneous();
- }
- catch (InterruptedException iex) {}
- }
- }
- finally {
- // close the file
- if (this.fileChannel.isOpen()) {
- this.fileChannel.close();
- }
- }
- }
- }
-
- /**
- * This method waits for all pending asynchronous requests to return. When the
- * last request has returned, the channel is closed and deleted.
- *
- * Even if an exception interrupts the closing, such that not all request are handled,
- * the underlying <tt>FileChannel</tt> is closed and deleted.
- *
- * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
- * the closing was interrupted.
- */
- public void closeAndDelete() throws IOException
- {
- try {
- close();
- }
- finally {
- deleteChannel();
- }
- }
-
-
- @Override
- protected void returnBuffer(MemorySegment buffer)
- {
- this.returnBuffers.add(buffer);
-
- // decrement the number of missing buffers. If we are currently closing, notify the
- if (this.closed) {
- synchronized (this.closeLock) {
- int num = this.requestsNotReturned.decrementAndGet();
- if (num == 0) {
- this.closeLock.notifyAll();
- }
- }
- }
- else {
- this.requestsNotReturned.decrementAndGet();
- }
- }
-}
-
-//--------------------------------------------------------------------------------------------
-
-/**
- * Special read request that reads an entire memory segment from a block reader.
- */
-final class SegmentReadRequest implements ReadRequest
-{
- private final BlockChannelAccess<ReadRequest, ?> channel;
-
- private final MemorySegment segment;
-
- protected SegmentReadRequest(BlockChannelAccess<ReadRequest, ?> targetChannel, MemorySegment segment)
- {
- this.channel = targetChannel;
- this.segment = segment;
- }
-
-
- @Override
- public void read() throws IOException
- {
- final FileChannel c = this.channel.fileChannel;
- if (c.size() - c.position() > 0) {
- try {
- final ByteBuffer wrapper = this.segment.wrap(0, this.segment.size());
- this.channel.fileChannel.read(wrapper);
- } catch (NullPointerException npex) {
- // the memory has been cleared asynchronouosly through task failing or canceling
- // ignore the request, since the result cannot be read
- }
- }
- }
-
-
- @Override
- public void requestDone(IOException ioex)
- {
- this.channel.handleProcessedBuffer(this.segment, ioex);
- }
-}
-
-//--------------------------------------------------------------------------------------------
-
-/**
- * Special write request that writes an entire memory segment to the block writer.
- */
-final class SegmentWriteRequest implements WriteRequest
-{
- private final BlockChannelAccess<WriteRequest, ?> channel;
-
- private final MemorySegment segment;
-
- protected SegmentWriteRequest(BlockChannelAccess<WriteRequest, ?> targetChannel, MemorySegment segment)
- {
- this.channel = targetChannel;
- this.segment = segment;
- }
-
-
- @Override
- public void write() throws IOException
- {
- try {
- this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size()));
- } catch (NullPointerException npex) {
- // the memory has been cleared asynchronouosly through task failing or canceling
- // ignore the request, since there is nothing to write.
- }
- }
-
-
- @Override
- public void requestDone(IOException ioex)
- {
- this.channel.handleProcessedBuffer(this.segment, ioex);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
index f674ad4..f25827a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
@@ -16,76 +16,30 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
-
/**
* A reader that reads data in blocks from a file channel. The reader reads the blocks into a
- * {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
- * request is not processed by the thread that issues it, but by an asynchronous reader thread. Once the read request
- * is done, the asynchronous reader adds the full MemorySegment to a <i>return queue</i> where it can be popped by the
- * worker thread, once it needs the data. The return queue is in this case a
- * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
- * if the request is still pending when the it requires the data.
- * <p>
- * Typical pre-fetching reads are done by issuing the read requests early and popping the return queue once the data
- * is actually needed.
- * <p>
- * The reader has no notion whether the size of the memory segments is actually the size of the blocks on disk,
- * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
- * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.
+ * {@link org.apache.flink.core.memory.MemorySegment}. To support asynchronous implementations,
+ * the read method does not immediately return the full memory segment, but rather adds it to
+ * a blocking queue of finished read operations.
*/
-public class BlockChannelReader extends BlockChannelAccess<ReadRequest, LinkedBlockingQueue<MemorySegment>>
-{
- /**
- * Creates a new block channel reader for the given channel.
- *
- * @param channelID The ID of the channel to read.
- * @param requestQueue The request queue of the asynchronous reader thread, to which the I/O requests
- * are added.
- * @param returnSegments The return queue, to which the full Memory Segments are added.
- * @throws IOException Thrown, if the underlying file channel could not be opened.
- */
- protected BlockChannelReader(Channel.ID channelID, RequestQueue<ReadRequest> requestQueue,
- LinkedBlockingQueue<MemorySegment> returnSegments, int numRequestsToBundle)
- throws IOException
- {
- super(channelID, requestQueue, returnSegments, false);
- }
+public interface BlockChannelReader extends FileIOChannel {
/**
- * Issues a read request, which will asynchronously fill the given segment with the next block in the
+ * Issues a read request, which will fill the given segment with the next block in the
* underlying file channel. Once the read request is fulfilled, the segment will be added to this reader's
* return queue.
*
* @param segment The segment to read the block into.
- * @throws IOException Thrown, when the reader encounters an I/O error. Due to the asynchronous nature of the
- * reader, the exception thrown here may have been caused by an earlier read request.
+ * @throws IOException Thrown, when the reader encounters an I/O error.
*/
- public void readBlock(MemorySegment segment) throws IOException
- {
- // check the error state of this channel
- checkErroneous();
-
- // write the current buffer and get the next one
- // the statements have to be in this order to avoid incrementing the counter
- // after the channel has been closed
- this.requestsNotReturned.incrementAndGet();
- if (this.closed || this.requestQueue.isClosed()) {
- // if we found ourselves closed after the counter increment,
- // decrement the counter again and do not forward the request
- this.requestsNotReturned.decrementAndGet();
- throw new IOException("The reader has been closed.");
- }
- this.requestQueue.add(new SegmentReadRequest(this, segment));
- }
+ void readBlock(MemorySegment segment) throws IOException;
/**
* Gets the next memory segment that has been filled with data by the reader. This method blocks until
@@ -98,22 +52,13 @@ public class BlockChannelReader extends BlockChannelAccess<ReadRequest, LinkedBl
* @return The next memory segment from the reader's return queue.
* @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
*/
- public MemorySegment getNextReturnedSegment() throws IOException
- {
- try {
- while (true) {
- final MemorySegment next = this.returnBuffers.poll(2000, TimeUnit.MILLISECONDS);
- if (next != null) {
- return next;
- } else {
- if (this.closed) {
- throw new IOException("The reader has been asynchronously closed.");
- }
- checkErroneous();
- }
- }
- } catch (InterruptedException iex) {
- throw new IOException("Reader was interrupted while waiting for the next returning segment.");
- }
- }
+ public MemorySegment getNextReturnedSegment() throws IOException;
+
+ /**
+ * Gets the queue in which the full memory segments are queued after the read is complete.
+ *
+ * @return The queue with the full memory segments.
+ */
+ LinkedBlockingQueue<MemorySegment> getReturnQueue();
}
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
index 44a2edb..25c74e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
@@ -16,101 +16,40 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
-
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
-
/**
* A writer that writes data in blocks to a file channel. The writer receives the data blocks in the form of
* {@link org.apache.flink.core.memory.MemorySegment}, which it writes entirely to the channel,
- * regardless of how space in the segment is used. The writing happens in an asynchronous fashion. That is, a write
- * request is not processed by the thread that issues it, but by an asynchronous writer thread. Once the request
- * is done, the asynchronous writer adds the MemorySegment to a <i>return queue</i> where it can be popped by the
- * worker thread, to be reused. The return queue is in this case a
- * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
- * if the request is still pending when the it requires the segment back.
- * <p>
- * Typical write behind is realized, by having a small set of segments in the return queue at all times. When a
- * memory segment must be written, the request is issued to the writer and a new segment is immediately popped from
- * the return queue. Once too many requests have been issued and the I/O thread cannot keep up, the working thread
- * naturally blocks until another segment is available again.
+ * regardless of how space in the segment is used. The writing may be realized synchronously, or asynchronously,
+ * depending on the implementation.
*/
-public class BlockChannelWriter extends BlockChannelAccess<WriteRequest, LinkedBlockingQueue<MemorySegment>>
-{
- /**
- * Creates a new block channel writer for the given channel.
- *
- * @param channelID The ID of the channel to write to.
- * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests
- * are added.
- * @param returnSegments The return queue, to which the processed Memory Segments are added.
- * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
- */
- protected BlockChannelWriter(Channel.ID channelID, RequestQueue<WriteRequest> requestQueue,
- LinkedBlockingQueue<MemorySegment> returnSegments, int numRequestsToBundle)
- throws IOException
- {
- super(channelID, requestQueue, returnSegments, true);
- }
-
- /**
- * Issues a asynchronous write request to the writer.
- *
- * @param segment The segment to be written.
- * @throws IOException Thrown, when the writer encounters an I/O error. Due to the asynchronous nature of the
- * writer, the exception thrown here may have been caused by an earlier write request.
- */
- public void writeBlock(MemorySegment segment) throws IOException
- {
- // check the error state of this channel
- checkErroneous();
-
- // write the current buffer and get the next one
- this.requestsNotReturned.incrementAndGet();
- if (this.closed || this.requestQueue.isClosed()) {
- // if we found ourselves closed after the counter increment,
- // decrement the counter again and do not forward the request
- this.requestsNotReturned.decrementAndGet();
- throw new IOException("The writer has been closed.");
- }
- this.requestQueue.add(new SegmentWriteRequest(this, segment));
- }
+public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
/**
* Gets the next memory segment that has been written and is available again.
* This method blocks until such a segment is available, or until an error occurs in the writer, or the
* writer is closed.
* <p>
- * WARNING: If this method is invoked without any segment ever returning (for example, because the
- * {@link #writeBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+ * NOTE: If this method is invoked without any segment ever returning (for example, because the
+ * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
* forever.
*
* @return The next memory segment from the writers's return queue.
* @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
*/
- public MemorySegment getNextReturnedSegment() throws IOException
- {
- try {
- while (true) {
- final MemorySegment next = this.returnBuffers.poll(2000, TimeUnit.MILLISECONDS);
- if (next != null) {
- return next;
- } else {
- if (this.closed) {
- throw new IOException("The writer has been closed.");
- }
- checkErroneous();
- }
- }
- } catch (InterruptedException iex) {
- throw new IOException("Writer was interrupted while waiting for the next returning segment.");
- }
- }
+ MemorySegment getNextReturnedSegment() throws IOException;
+
+ /**
+ * Gets the queue in which the memory segments are queued after the asynchronous write
+ * is completed
+ *
+ * @return The queue with the written memory segments.
+ */
+ LinkedBlockingQueue<MemorySegment> getReturnQueue();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
new file mode 100644
index 0000000..57bc7e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+public interface BlockChannelWriterWithCallback extends FileIOChannel {
+
+ /**
+ * Writes the given memory segment. The request may be executed synchronously, or asynchronously, depending
+ * on the implementation.
+ *
+ * @param segment The segment to be written.
+ * @throws IOException Thrown, when the writer encounters an I/O error.
+ */
+ void writeBlock(MemorySegment segment) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
index 3be85d1..84883e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
@@ -16,71 +16,16 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.flink.core.memory.MemorySegment;
-
/**
*
- *
*/
-public class BulkBlockChannelReader extends BlockChannelAccess<ReadRequest, ArrayList<MemorySegment>>
-{
+public interface BulkBlockChannelReader extends FileIOChannel {
-
- protected BulkBlockChannelReader(Channel.ID channelID, RequestQueue<ReadRequest> requestQueue,
- List<MemorySegment> sourceSegments, int numBlocks)
- throws IOException
- {
- super(channelID, requestQueue, new ArrayList<MemorySegment>(numBlocks), false);
-
- // sanity check
- if (sourceSegments.size() < numBlocks) {
- throw new IllegalArgumentException("The list of source memory segments must contain at least" +
- " as many segments as the number of blocks to read.");
- }
-
- // send read requests for all blocks
- for (int i = 0; i < numBlocks; i++) {
- readBlock(sourceSegments.remove(sourceSegments.size() - 1));
- }
- }
-
-
-
- private void readBlock(MemorySegment segment) throws IOException
- {
- // check the error state of this channel
- checkErroneous();
-
- // write the current buffer and get the next one
- this.requestsNotReturned.incrementAndGet();
- if (this.closed || this.requestQueue.isClosed()) {
- // if we found ourselves closed after the counter increment,
- // decrement the counter again and do not forward the request
- this.requestsNotReturned.decrementAndGet();
- throw new IOException("The reader has been closed.");
- }
- this.requestQueue.add(new SegmentReadRequest(this, segment));
- }
-
- public List<MemorySegment> getFullSegments()
- {
- synchronized (this.closeLock) {
- if (!this.isClosed() || this.requestsNotReturned.get() > 0) {
- throw new IllegalStateException("Full segments can only be obtained after the reader was properly closed.");
- }
- }
-
- return this.returnBuffers;
- }
-
+ List<MemorySegment> getFullSegments();
}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
deleted file mode 100644
index 7e64e79..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.File;
-import java.util.Random;
-
-import org.apache.flink.util.StringUtils;
-
-/**
- * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
- * files that contain sorted runs of data from the same stream, that will later on be merged together.
- *
- */
-public final class Channel
-{
- private static final int RANDOM_BYTES_LENGTH = 16;
-
- /**
- * An ID identifying an underlying fileChannel.
- *
- */
- public static class ID
- {
- private final String path;
-
- private final int threadNum;
-
- protected ID(final String path, final int threadNum) {
- this.path = path;
- this.threadNum = threadNum;
- }
-
- protected ID(final String basePath, final int threadNum, final Random random)
- {
- this.path = basePath + File.separator + randomString(random) + ".channel";
- this.threadNum = threadNum;
- }
-
- /**
- * Returns the path to the underlying temporary file.
- */
- public String getPath() {
- return path;
- }
-
- int getThreadNum() {
- return this.threadNum;
- }
-
- public String toString() {
- return path;
- }
- }
-
- public static final class Enumerator
- {
- private static final String FORMAT = "%s%s%s.%06d.channel";
-
- private final String[] paths;
-
- private final String namePrefix;
-
- private int counter;
-
- protected Enumerator(final String[] basePaths, final Random random)
- {
- this.paths = basePaths;
- this.namePrefix = randomString(random);
- this.counter = 0;
- }
-
- public ID next()
- {
- final int threadNum = counter % paths.length;
- return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
- }
- }
-
- /**
- * Creates a random byte sequence using the provided {@code random} generator and returns its hex representation.
- *
- * @param random
- * The random number generator to be used.
- * @return A hex representation of the generated byte sequence
- */
- private static final String randomString(final Random random) {
- final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
- random.nextBytes(bytes);
- return StringUtils.byteToHexString(bytes);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
deleted file mode 100644
index 2b5b34d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-
-
-/**
- * A base class for readers and writers that read data from I/O manager channels, or write data to them.
- * Requests handled by channels that inherit from this class are executed asynchronously, which allows
- * write-behind for writers and pre-fetching for readers.
- *
- *
- * @param <T> The buffer type used for the underlying IO operations.
- */
-public abstract class ChannelAccess<T, R extends IORequest>
-{
- /**
- * The ID of the underlying channel.
- */
- protected final Channel.ID id;
-
- /**
- * A file channel for NIO access to the file.
- */
- protected final FileChannel fileChannel;
-
- /**
- * A request queue for submitting asynchronous requests to the corresponding
- * IO worker thread.
- */
- protected final RequestQueue<R> requestQueue;
-
- /**
- * An exception that was encountered by the asynchronous request handling thread.
- */
- protected volatile IOException exception;
-
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a new channel to the path indicated by the given ID. The channel hands IO requests to
- * the given request queue to be processed.
- *
- * @param channelID The id describing the path of the file that the channel accessed.
- * @param requestQueue The queue that this channel hands its IO requests to.
- * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
- * than in read-only mode.
- * @throws IOException Thrown, if the channel could no be opened.
- */
- protected ChannelAccess(Channel.ID channelID, RequestQueue<R> requestQueue, boolean writeEnabled)
- throws IOException
- {
- if (channelID == null || requestQueue == null) {
- throw new NullPointerException();
- }
-
- this.id = channelID;
- this.requestQueue = requestQueue;
-
- try {
- @SuppressWarnings("resource")
- RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
- this.fileChannel = file.getChannel();
- }
- catch (IOException e) {
- throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Checks, whether this channel has been closed;
- *
- * @return True, if the channel has been closed, false otherwise.
- */
- public abstract boolean isClosed();
-
- /**
- * This method is invoked by the asynchronous I/O thread to return a buffer after the I/O request
- * completed.
- *
- * @param buffer The buffer to be returned.
- */
- protected abstract void returnBuffer(T buffer);
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the channel ID of this channel.
- *
- * @return This channel's ID.
- */
- public final Channel.ID getChannelID()
- {
- return this.id;
- }
-
- /**
- * Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
- * be processed correctly.
- *
- * @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
- * that defined the erroneous state as its cause.
- */
- public final void checkErroneous() throws IOException
- {
- if (this.exception != null) {
- throw new IOException("The channel is erroneous.", this.exception);
- }
- }
-
- /**
- * Deletes this channel by physically removing the file beneath it.
- * This method may only be called on a closed channel.
- */
- public void deleteChannel()
- {
- if (this.fileChannel.isOpen()) {
- throw new IllegalStateException("Cannot delete a channel that is open.");
- }
-
- // make a best effort to delete the file. Don't report exceptions.
- try {
- File f = new File(this.id.getPath());
- if (f.exists()) {
- f.delete();
- }
- } catch (Throwable t) {}
- }
-
- /**
- * Handles a processed <tt>Buffer</tt>. This method is invoked by the
- * asynchronous IO worker threads upon completion of the IO request with the
- * provided buffer and/or an exception that occurred while processing the request
- * for that buffer.
- *
- * @param buffer The buffer to be processed.
- * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
- */
- final void handleProcessedBuffer(T buffer, IOException ex) {
-
- if (ex != null && this.exception == null) {
- this.exception = ex;
- }
-
- returnBuffer(buffer);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index 25aa289..d85ec82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -164,8 +164,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying reader could not be properly closed.
*/
- public List<MemorySegment> close() throws IOException
- {
+ public List<MemorySegment> close() throws IOException {
if (this.closed) {
throw new IllegalStateException("Already closed.");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index f230333..9824d34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -81,8 +81,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
* @param memory The memory used to buffer data, or null, to utilize solely the return queue.
* @param segmentSize The size of the memory segments.
*/
- public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize)
- {
+ public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize) {
super(segmentSize, HEADER_LENGTH);
if (writer == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
new file mode 100644
index 0000000..7c9d31b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.util.StringUtils;
+
+/**
+ * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
+ * files that contain sorted runs of data from the same stream, that will later on be merged together.
+ */
+public interface FileIOChannel {
+
+ /**
+ * Gets the channel ID of this I/O channel.
+ *
+ * @return The channel ID.
+ */
+ FileIOChannel.ID getChannelID();
+
+ /**
+ * Checks whether the channel has been closed.
+ *
+ * @return True if the channel has been closed, false otherwise.
+ */
+ boolean isClosed();
+
+ /**
+ * Closes the channel. For asynchronous implementations, this method waits until all pending requests are
+ * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+ *
+ * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+ */
+ void close() throws IOException;
+
+ /**
+ * Deletes the file underlying this I/O channel.
+ *
+ * @throws IllegalStateException Thrown, when the channel is still open.
+ */
+ void deleteChannel();
+
+ /**
+ * Closes the channel and deletes the underlying file.
+ * For asynchronous implementations, this method waits until all pending requests are handled;
+ *
+ * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+ */
+ public void closeAndDelete() throws IOException;
+
+ // --------------------------------------------------------------------------------------------
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * An ID identifying an underlying file channel.
+ */
+ public static class ID {
+
+ private static final int RANDOM_BYTES_LENGTH = 16;
+
+ private final String path;
+
+ private final int threadNum;
+
+ protected ID(String path, int threadNum) {
+ this.path = path;
+ this.threadNum = threadNum;
+ }
+
+ protected ID(String basePath, int threadNum, Random random) {
+ this.path = basePath + File.separator + randomString(random) + ".channel";
+ this.threadNum = threadNum;
+ }
+
+ /**
+ * Returns the path to the underlying temporary file.
+ */
+ public String getPath() {
+ return path;
+ }
+
+ int getThreadNum() {
+ return this.threadNum;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ID) {
+ ID other = (ID) obj;
+ return this.path.equals(other.path) && this.threadNum == other.threadNum;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+
+ private static final String randomString(final Random random) {
+ final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
+ random.nextBytes(bytes);
+ return StringUtils.byteToHexString(bytes);
+ }
+ }
+
+ /**
+ * An enumerator for channels that logically belong together.
+ */
+ public static final class Enumerator {
+
+ private static final String FORMAT = "%s%s%s.%06d.channel";
+
+ private final String[] paths;
+
+ private final String namePrefix;
+
+ private int counter;
+
+ protected Enumerator(String[] basePaths, Random random) {
+ this.paths = basePaths;
+ this.namePrefix = ID.randomString(random);
+ this.counter = 0;
+ }
+
+ public ID next() {
+ final int threadNum = counter % paths.length;
+ return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
+ }
+ }
+}