You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/18 17:48:59 UTC
[10/13] flink git commit: [FLINK-1350] [runtime] Add blocking result
partition variant
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index 8889f70..cca04b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -19,10 +19,13 @@
package org.apache.flink.runtime.io.disk.iomanager;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.NotificationListener;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -37,36 +40,42 @@ import static com.google.common.base.Preconditions.checkNotNull;
* @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to the I/O threads.
*/
public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends AbstractFileIOChannel {
-
- /** The lock that is used during closing to synchronize the thread that waits for all
- * requests to be handled with the asynchronous I/O thread. */
+
+ private final Object listenerLock = new Object();
+
+ /**
+ * The lock that is used during closing to synchronize the thread that waits for all
+ * requests to be handled with the asynchronous I/O thread.
+ */
protected final Object closeLock = new Object();
-
+
/** A request queue for submitting asynchronous requests to the corresponding IO worker thread. */
protected final RequestQueue<R> requestQueue;
-
+
/** An atomic integer that counts the number of requests that we still wait for to return. */
protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
/** Handler for completed requests */
protected final RequestDoneCallback<T> resultHandler;
-
- /** An exception that was encountered by the asynchronous request handling thread.*/
+
+ /** An exception that was encountered by the asynchronous request handling thread. */
protected volatile IOException exception;
-
+
/** Flag marking this channel as closed */
protected volatile boolean closed;
+ private NotificationListener allRequestsProcessedListener;
+
// --------------------------------------------------------------------------------------------
-
+
/**
* Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
- * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers
+ * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers
* are returned by adding the to the given queue.
- *
- * @param channelID The id describing the path of the file that the channel accessed.
+ *
+ * @param channelID The id describing the path of the file that the channel accessed.
* @param requestQueue The queue that this channel hands its IO requests to.
- * @param callback The callback to be invoked when a request is done.
+ * @param callback The callback to be invoked when a request is done.
* @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
* than in read-only mode.
* @throws IOException Thrown, if the channel could no be opened.
@@ -79,21 +88,25 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
this.requestQueue = checkNotNull(requestQueue);
this.resultHandler = checkNotNull(callback);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public boolean isClosed() {
return this.closed;
}
-
+
/**
- * Closes the reader and waits until all pending asynchronous requests are
- * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
- *
+ * Closes the channel and waits until all pending asynchronous requests are processed. The
+ * underlying <code>FileChannel</code> is closed even if an exception interrupts the closing.
+ *
+ * <p> <strong>Important:</strong> the {@link #isClosed()} method returns <code>true</code>
+ * immediately after this method has been called even when there are outstanding requests.
+ *
* @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
* the closing was interrupted.
*/
+ @Override
public void close() throws IOException {
// atomically set the close flag
synchronized (this.closeLock) {
@@ -101,7 +114,7 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
return;
}
this.closed = true;
-
+
try {
// wait until as many buffers have been returned as were written
// only then is everything guaranteed to be consistent.
@@ -136,9 +149,10 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
* <p>
* Even if an exception interrupts the closing, such that not all request are handled,
* the underlying <tt>FileChannel</tt> is closed and deleted.
- *
+ *
* @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if the closing was interrupted.
*/
+ @Override
public void closeAndDelete() throws IOException {
try {
close();
@@ -147,11 +161,11 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
deleteChannel();
}
}
-
+
/**
* Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
* be processed correctly.
- *
+ *
* @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
* that defined the erroneous state as its cause.
*/
@@ -160,15 +174,15 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
throw this.exception;
}
}
-
+
/**
* Handles a processed <tt>Buffer</tt>. This method is invoked by the
* asynchronous IO worker threads upon completion of the IO request with the
* provided buffer and/or an exception that occurred while processing the request
* for that buffer.
- *
+ *
* @param buffer The buffer to be processed.
- * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
+ * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
*/
final protected void handleProcessedBuffer(T buffer, IOException ex) {
if (buffer == null) {
@@ -186,13 +200,26 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
}
}
finally {
- // decrement the number of missing buffers. If we are currently closing, notify the waiters
+ NotificationListener listener = null;
+
+ // Decrement the number of outstanding requests. If we are currently closing, notify the
+ // waiters. If there is a listener, notify her as well.
synchronized (this.closeLock) {
- final int num = this.requestsNotReturned.decrementAndGet();
- if (this.closed && num == 0) {
- this.closeLock.notifyAll();
+ if (this.requestsNotReturned.decrementAndGet() == 0) {
+ if (this.closed) {
+ this.closeLock.notifyAll();
+ }
+
+ synchronized (listenerLock) {
+ listener = allRequestsProcessedListener;
+ allRequestsProcessedListener = null;
+ }
}
}
+
+ if (listener != null) {
+ listener.onNotification();
+ }
}
}
@@ -202,14 +229,57 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
// write the current buffer and get the next one
this.requestsNotReturned.incrementAndGet();
+
if (this.closed || this.requestQueue.isClosed()) {
// if we found ourselves closed after the counter increment,
// decrement the counter again and do not forward the request
this.requestsNotReturned.decrementAndGet();
+
+ final NotificationListener listener;
+
+ synchronized (listenerLock) {
+ listener = allRequestsProcessedListener;
+ allRequestsProcessedListener = null;
+ }
+
+ if (listener != null) {
+ listener.onNotification();
+ }
+
throw new IOException("I/O channel already closed. Could not fulfill: " + request);
}
+
this.requestQueue.add(request);
}
+
+ /**
+ * Registers a listener to be notified when all outstanding requests have been processed.
+ *
+ * <p> New requests can arrive right after the listener got notified. Therefore, it is not safe
+ * to assume that the number of outstanding requests is still zero after a notification unless
+ * there was a close right before the listener got called.
+ *
+ * <p> Returns <code>true</code>, if the registration was successful. A registration can fail,
+ * if there are no outstanding requests when trying to register a listener.
+ */
+ protected boolean registerAllRequestsProcessedListener(NotificationListener listener) throws IOException {
+ checkNotNull(listener);
+
+ synchronized (listenerLock) {
+ if (allRequestsProcessedListener == null) {
+ // There was a race with the processing of the last outstanding request
+ if (requestsNotReturned.get() == 0) {
+ return false;
+ }
+
+ allRequestsProcessedListener = listener;
+
+ return true;
+ }
+ }
+
+ throw new IllegalStateException("Already subscribed.");
+ }
}
//--------------------------------------------------------------------------------------------
@@ -218,11 +288,11 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
* Read request that reads an entire memory segment from a block reader.
*/
final class SegmentReadRequest implements ReadRequest {
-
+
private final AsynchronousFileIOChannel<MemorySegment, ReadRequest> channel;
-
+
private final MemorySegment segment;
-
+
protected SegmentReadRequest(AsynchronousFileIOChannel<MemorySegment, ReadRequest> targetChannel, MemorySegment segment) {
this.channel = targetChannel;
this.segment = segment;
@@ -254,11 +324,11 @@ final class SegmentReadRequest implements ReadRequest {
* Write request that writes an entire memory segment to the block writer.
*/
final class SegmentWriteRequest implements WriteRequest {
-
+
private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
-
+
private final MemorySegment segment;
-
+
protected SegmentWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) {
this.channel = targetChannel;
this.segment = segment;
@@ -280,6 +350,135 @@ final class SegmentWriteRequest implements WriteRequest {
}
}
+final class BufferWriteRequest implements WriteRequest {
+
+ private final AsynchronousFileIOChannel<Buffer, WriteRequest> channel;
+
+ private final Buffer buffer;
+
+ protected BufferWriteRequest(AsynchronousFileIOChannel<Buffer, WriteRequest> targetChannel, Buffer buffer) {
+ this.channel = checkNotNull(targetChannel);
+ this.buffer = checkNotNull(buffer);
+ }
+
+ @Override
+ public void write() throws IOException {
+ final ByteBuffer header = ByteBuffer.allocateDirect(8);
+
+ header.putInt(buffer.isBuffer() ? 1 : 0);
+ header.putInt(buffer.getSize());
+ header.flip();
+
+ channel.fileChannel.write(header);
+ channel.fileChannel.write(buffer.getNioBuffer());
+ }
+
+ @Override
+ public void requestDone(IOException error) {
+ channel.handleProcessedBuffer(buffer, error);
+ }
+}
+
+final class BufferReadRequest implements ReadRequest {
+
+ private final AsynchronousFileIOChannel<Buffer, ReadRequest> channel;
+
+ private final Buffer buffer;
+
+ private final AtomicBoolean hasReachedEndOfFile;
+
+ protected BufferReadRequest(AsynchronousFileIOChannel<Buffer, ReadRequest> targetChannel, Buffer buffer, AtomicBoolean hasReachedEndOfFile) {
+ this.channel = targetChannel;
+ this.buffer = buffer;
+ this.hasReachedEndOfFile = hasReachedEndOfFile;
+ }
+
+ @Override
+ public void read() throws IOException {
+
+ final FileChannel fileChannel = channel.fileChannel;
+
+ if (fileChannel.size() - fileChannel.position() > 0) {
+ final ByteBuffer header = ByteBuffer.allocateDirect(8);
+
+ fileChannel.read(header);
+ header.flip();
+
+ final boolean isBuffer = header.getInt() == 1;
+ final int size = header.getInt();
+
+ if (size > buffer.getMemorySegment().size()) {
+ throw new IllegalStateException("Buffer is too small for data: " + buffer.getMemorySegment().size() + " bytes available, but " + size + " needed. This is most likely due to an serialized event, which is larger than the buffer size.");
+ }
+
+ buffer.setSize(size);
+
+ fileChannel.read(buffer.getNioBuffer());
+
+ if (!isBuffer) {
+ buffer.tagAsEvent();
+ }
+
+ hasReachedEndOfFile.set(fileChannel.size() - fileChannel.position() == 0);
+ }
+ else {
+ hasReachedEndOfFile.set(true);
+ }
+ }
+
+ @Override
+ public void requestDone(IOException error) {
+ channel.handleProcessedBuffer(buffer, error);
+ }
+}
+
+final class FileSegmentReadRequest implements ReadRequest {
+
+ private final AsynchronousFileIOChannel<FileSegment, ReadRequest> channel;
+
+ private final AtomicBoolean hasReachedEndOfFile;
+
+ private FileSegment fileSegment;
+
+ protected FileSegmentReadRequest(AsynchronousFileIOChannel<FileSegment, ReadRequest> targetChannel, AtomicBoolean hasReachedEndOfFile) {
+ this.channel = targetChannel;
+ this.hasReachedEndOfFile = hasReachedEndOfFile;
+ }
+
+ @Override
+ public void read() throws IOException {
+
+ final FileChannel fileChannel = channel.fileChannel;
+
+ if (fileChannel.size() - fileChannel.position() > 0) {
+ final ByteBuffer header = ByteBuffer.allocateDirect(8);
+
+ fileChannel.read(header);
+ header.flip();
+
+ final long position = fileChannel.position();
+
+ final boolean isBuffer = header.getInt() == 1;
+ final int length = header.getInt();
+
+ fileSegment = new FileSegment(fileChannel, position, length, isBuffer);
+
+ // Skip the binary dataa
+ fileChannel.position(position + length);
+
+ hasReachedEndOfFile.set(fileChannel.size() - fileChannel.position() == 0);
+ }
+ else {
+ hasReachedEndOfFile.set(true);
+ }
+ }
+
+ @Override
+ public void requestDone(IOException error) {
+ channel.handleProcessedBuffer(fileSegment, error);
+ }
+}
+
/**
* Request that seeks the underlying file channel to the given position.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
index 8f7f218..957052e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
@@ -21,15 +21,13 @@ package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.flink.core.memory.MemorySegment;
-
/**
* A reader that reads data in blocks from a file channel. The reader reads the blocks into a
* {@link org.apache.flink.core.memory.MemorySegment}. To support asynchronous implementations,
* the read method does not immediately return the full memory segment, but rather adds it to
* a blocking queue of finished read operations.
*/
-public interface BlockChannelReader extends FileIOChannel {
+public interface BlockChannelReader<T> extends FileIOChannel {
/**
* Issues a read request, which will fill the given segment with the next block in the
@@ -39,33 +37,27 @@ public interface BlockChannelReader extends FileIOChannel {
* @param segment The segment to read the block into.
* @throws IOException Thrown, when the reader encounters an I/O error.
*/
- void readBlock(MemorySegment segment) throws IOException;
+ void readBlock(T segment) throws IOException;
+
+ void seekToPosition(long position) throws IOException;
/**
* Gets the next memory segment that has been filled with data by the reader. This method blocks until
* such a segment is available, or until an error occurs in the reader, or the reader is closed.
* <p>
* WARNING: If this method is invoked without any segment ever returning (for example, because the
- * {@link #readBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+ * {@link #readBlock(T)} method has not been invoked appropriately), the method may block
* forever.
*
* @return The next memory segment from the reader's return queue.
* @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
*/
- public MemorySegment getNextReturnedSegment() throws IOException;
+ public T getNextReturnedBlock() throws IOException;
/**
* Gets the queue in which the full memory segments are queued after the read is complete.
*
* @return The queue with the full memory segments.
*/
- LinkedBlockingQueue<MemorySegment> getReturnQueue();
-
- /**
- * Seeks the underlying file channel to the given position.
- *
- * @param position The position to seek to.
- */
- void seekToPosition(long position) throws IOException;
+ LinkedBlockingQueue<T> getReturnQueue();
}
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
index 25c74e4..ccf065a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
@@ -21,15 +21,13 @@ package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.flink.core.memory.MemorySegment;
-
/**
* A writer that writes data in blocks to a file channel. The writer receives the data blocks in the form of
* {@link org.apache.flink.core.memory.MemorySegment}, which it writes entirely to the channel,
* regardless of how space in the segment is used. The writing may be realized synchronously, or asynchronously,
* depending on the implementation.
*/
-public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
+public interface BlockChannelWriter<T> extends BlockChannelWriterWithCallback<T> {
/**
* Gets the next memory segment that has been written and is available again.
@@ -37,13 +35,13 @@ public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
* writer is closed.
* <p>
* NOTE: If this method is invoked without any segment ever returning (for example, because the
- * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
+ * {@link #writeBlock(T)} method has not been invoked accordingly), the method may block
* forever.
*
* @return The next memory segment from the writers's return queue.
* @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
*/
- MemorySegment getNextReturnedSegment() throws IOException;
+ T getNextReturnedBlock() throws IOException;
/**
* Gets the queue in which the memory segments are queued after the asynchronous write
@@ -51,5 +49,5 @@ public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
*
* @return The queue with the written memory segments.
*/
- LinkedBlockingQueue<MemorySegment> getReturnQueue();
+ LinkedBlockingQueue<T> getReturnQueue();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
index 57bc7e0..f7618e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
@@ -20,16 +20,14 @@ package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
-import org.apache.flink.core.memory.MemorySegment;
+public interface BlockChannelWriterWithCallback<T> extends FileIOChannel {
-public interface BlockChannelWriterWithCallback extends FileIOChannel {
-
/**
- * Writes the given memory segment. The request may be executed synchronously, or asynchronously, depending
+ * Writes the given block. The request may be executed synchronously, or asynchronously, depending
* on the implementation.
- *
- * @param segment The segment to be written.
+ *
+ * @param block The segment to be written.
* @throws IOException Thrown, when the writer encounters an I/O error.
*/
- void writeBlock(MemorySegment segment) throws IOException;
+ void writeBlock(T block) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileReader.java
new file mode 100644
index 0000000..74999e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+public interface BufferFileReader extends FileIOChannel {
+
+ void readInto(Buffer buffer) throws IOException;
+
+ void seekToPosition(long position) throws IOException;
+
+ boolean hasReachedEndOfFile();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileSegmentReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileSegmentReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileSegmentReader.java
new file mode 100644
index 0000000..fa25d4f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileSegmentReader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+public interface BufferFileSegmentReader extends FileIOChannel {
+
+ void read() throws IOException;
+
+ void seekTo(long position) throws IOException;
+
+ boolean hasReachedEndOfFile();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriter.java
new file mode 100644
index 0000000..704aad2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+
+public interface BufferFileWriter extends BlockChannelWriterWithCallback<Buffer> {
+
+ /**
+ * Returns the number of outstanding requests.
+ */
+ int getNumberOfOutstandingRequests();
+
+ /**
+ * Registers a listener, which is notified after all outstanding requests have been processed.
+ */
+ boolean registerAllRequestsProcessedListener(NotificationListener listener) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index d85ec82..b919034 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
*/
public class ChannelReaderInputView extends AbstractPagedInputView {
- protected final BlockChannelReader reader; // the block reader that reads memory segments
+ protected final BlockChannelReader<MemorySegment> reader; // the block reader that reads memory segments
protected int numRequestsRemaining; // the number of block requests remaining
@@ -63,7 +63,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
* @throws IOException Thrown, if the read requests for the first blocks fail to be
* served by the reader.
*/
- public ChannelReaderInputView(BlockChannelReader reader, List<MemorySegment> memory, boolean waitForFirstBlock)
+ public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory, boolean waitForFirstBlock)
throws IOException
{
this(reader, memory, -1, waitForFirstBlock);
@@ -89,7 +89,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
* @throws IOException Thrown, if the read requests for the first blocks fail to be
* served by the reader.
*/
- public ChannelReaderInputView(BlockChannelReader reader, List<MemorySegment> memory,
+ public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory,
int numBlocks, boolean waitForFirstBlock)
throws IOException
{
@@ -117,7 +117,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
*
* @throws IOException
*/
- ChannelReaderInputView(BlockChannelReader reader, List<MemorySegment> memory,
+ ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory,
int numBlocks, int headerLen, boolean waitForFirstBlock)
throws IOException
{
@@ -225,7 +225,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
}
// get the next segment
- final MemorySegment seg = this.reader.getNextReturnedSegment();
+ final MemorySegment seg = this.reader.getNextReturnedBlock();
// check the header
if (seg.getShort(0) != ChannelWriterOutputView.HEADER_MAGIC_NUMBER) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index 9824d34..089e10a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -61,7 +61,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
// --------------------------------------------------------------------------------------------
- private final BlockChannelWriter writer; // the writer to the channel
+ private final BlockChannelWriter<MemorySegment> writer; // the writer to the channel
private long bytesBeforeSegment; // the number of bytes written before the current memory segment
@@ -81,7 +81,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
* @param memory The memory used to buffer data, or null, to utilize solely the return queue.
* @param segmentSize The size of the memory segments.
*/
- public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize) {
+ public ChannelWriterOutputView(BlockChannelWriter<MemorySegment> writer, List<MemorySegment> memory, int segmentSize) {
super(segmentSize, HEADER_LENGTH);
if (writer == null) {
@@ -123,7 +123,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
* @param writer The writer to write to.
* @param segmentSize The size of the memory segments.
*/
- public ChannelWriterOutputView(BlockChannelWriter writer, int segmentSize)
+ public ChannelWriterOutputView(BlockChannelWriter<MemorySegment> writer, int segmentSize)
{
this(writer, null, segmentSize);
}
@@ -203,7 +203,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
writeSegment(current, posInSegment, false);
}
- final MemorySegment next = this.writer.getNextReturnedSegment();
+ final MemorySegment next = this.writer.getNextReturnedBlock();
this.blockCount++;
return next;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index c5a3daa..f9ee90c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
import java.io.File;
import java.io.IOException;
+import java.nio.channels.FileChannel;
import java.util.Random;
import org.apache.flink.util.StringUtils;
@@ -73,6 +74,8 @@ public interface FileIOChannel {
* @throws IOException Thrown, if an error occurred while waiting for pending requests.
*/
public void closeAndDelete() throws IOException;
+
+ FileChannel getNioFileChannel();
// --------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileSegment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileSegment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileSegment.java
new file mode 100644
index 0000000..7c3a83e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileSegment.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.nio.channels.FileChannel;
+
+public class FileSegment {
+
+ private final FileChannel fileChannel;
+ private final long position;
+ private final int length;
+ private final boolean isBuffer;
+
+ public FileSegment(FileChannel fileChannel, long position, int length, boolean isBuffer) {
+ this.fileChannel = fileChannel;
+ this.position = position;
+ this.length = length;
+ this.isBuffer = isBuffer;
+ }
+
+ public FileChannel getFileChannel() {
+ return fileChannel;
+ }
+
+ public long getPosition() {
+ return position;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public boolean isBuffer() {
+ return isBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
index cdad3fb..63e86c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
@@ -60,7 +60,7 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView
* @throws IOException Thrown, if the read requests for the first blocks fail to be
* served by the reader.
*/
- public HeaderlessChannelReaderInputView(BlockChannelReader reader, List<MemorySegment> memory, int numBlocks,
+ public HeaderlessChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory, int numBlocks,
int numBytesInLastBlock, boolean waitForFirstBlock)
throws IOException
{
@@ -87,7 +87,7 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView
// get the next segment
this.numBlocksRemaining--;
- return this.reader.getNextReturnedSegment();
+ return this.reader.getNextReturnedBlock();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index c04ba97..c1a4b84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
import org.apache.commons.io.FileUtils;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +36,21 @@ import java.util.concurrent.LinkedBlockingQueue;
*/
public abstract class IOManager {
+ public enum IOMode {
+
+ SYNC(true), ASYNC(false);
+
+ private final boolean isSynchronous;
+
+ IOMode(boolean isSynchronous) {
+ this.isSynchronous = isSynchronous;
+ }
+
+ public boolean isSynchronous() {
+ return isSynchronous;
+ }
+ }
+
/** Logging */
protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
@@ -190,7 +206,7 @@ public abstract class IOManager {
* @return A block channel writer that writes to the given channel.
* @throws IOException Thrown, if the channel for the writer could not be opened.
*/
- public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
+ public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
}
@@ -203,7 +219,7 @@ public abstract class IOManager {
* @return A block channel writer that writes to the given channel.
* @throws IOException Thrown, if the channel for the writer could not be opened.
*/
- public abstract BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+ public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID,
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
/**
@@ -216,7 +232,7 @@ public abstract class IOManager {
* @return A block channel writer that writes to the given channel.
* @throws IOException Thrown, if the channel for the writer could not be opened.
*/
- public abstract BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException;
+ public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException;
/**
* Creates a block channel reader that reads blocks from the given channel. The reader pushed
@@ -227,7 +243,7 @@ public abstract class IOManager {
* @return A block channel reader that reads from the given channel.
* @throws IOException Thrown, if the channel for the reader could not be opened.
*/
- public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
+ public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
}
@@ -240,9 +256,15 @@ public abstract class IOManager {
* @return A block channel reader that reads from the given channel.
* @throws IOException Thrown, if the channel for the reader could not be opened.
*/
- public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
+ public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID,
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
+ public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;
+
+ public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;
+
+ public abstract BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException;
+
/**
* Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
* The reader draws segments to read the blocks into from a supplied list, which must contain as many
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 2396665..e615913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.disk.iomanager;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.EnvironmentInformation;
import java.io.IOException;
@@ -143,7 +144,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
}
}
finally {
- // make sure we all the super implementation in any case and at the last point,
+ // make sure we call the super implementation in any case and at the last point,
// because this will clean up the I/O directories
super.shutdown();
}
@@ -182,7 +183,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
// ------------------------------------------------------------------------
@Override
- public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+ public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID,
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
{
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
@@ -190,7 +191,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
}
@Override
- public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException {
+ public BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException {
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback);
}
@@ -206,13 +207,34 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
* @throws IOException Thrown, if the channel for the reader could not be opened.
*/
@Override
- public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
+ public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID,
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
{
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
}
-
+
+ @Override
+ public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException {
+ checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+
+ return new AsynchronousBufferFileWriter(channelID, writers[channelID.getThreadNum()].requestQueue);
+ }
+
+ @Override
+ public BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException {
+ checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+
+ return new AsynchronousBufferFileReader(channelID, readers[channelID.getThreadNum()].requestQueue, callback);
+ }
+
+ @Override
+ public BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException {
+ checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+
+ return new AsynchronousBufferFileSegmentReader(channelID, readers[channelID.getThreadNum()].requestQueue, callback);
+ }
+
/**
* Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
* The reader draws segments to read the blocks into from a supplied list, which must contain as many
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
index 95f3dc7..a2e3e82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
@@ -21,26 +21,24 @@ package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.flink.core.memory.MemorySegment;
-
/**
* A {@link RequestDoneCallback} that adds the memory segments to a blocking queue.
*/
-public class QueuingCallback implements RequestDoneCallback<MemorySegment> {
+public class QueuingCallback<T> implements RequestDoneCallback<T> {
+
+ private final LinkedBlockingQueue<T> queue;
- private final LinkedBlockingQueue<MemorySegment> queue;
-
- public QueuingCallback(LinkedBlockingQueue<MemorySegment> queue) {
+ public QueuingCallback(LinkedBlockingQueue<T> queue) {
this.queue = queue;
}
@Override
- public void requestSuccessful(MemorySegment buffer) {
+ public void requestSuccessful(T buffer) {
queue.add(buffer);
}
@Override
- public void requestFailed(MemorySegment buffer, IOException e) {
+ public void requestFailed(T buffer, IOException e) {
// the I/O error is recorded in the writer already
queue.add(buffer);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
new file mode 100644
index 0000000..27189cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A synchronous {@link BufferFileReader} implementation.
+ *
+ * <p> This currently bypasses the I/O manager as it is the only synchronous implementation, which
+ * is currently in use.
+ *
+ * TODO Refactor I/O manager setup and refactor this into it
+ */
+public class SynchronousBufferFileReader extends SynchronousFileIOChannel implements BufferFileReader {
+
+ private final ByteBuffer header = ByteBuffer.allocateDirect(8);
+
+ private boolean hasReachedEndOfFile;
+
+ public SynchronousBufferFileReader(ID channelID, boolean writeEnabled) throws IOException {
+ super(channelID, writeEnabled);
+ }
+
+ @Override
+ public void readInto(Buffer buffer) throws IOException {
+ if (fileChannel.size() - fileChannel.position() > 0) {
+ // This is the synchronous counter part to the asynchronous buffer read request
+
+ // Read header
+ header.clear();
+ fileChannel.read(header);
+ header.flip();
+
+ final boolean isBuffer = header.getInt() == 1;
+ final int size = header.getInt();
+
+ if (size > buffer.getMemorySegment().size()) {
+ throw new IllegalStateException("Buffer is too small for data: " + buffer.getMemorySegment().size() + " bytes available, but " + size + " needed. This is most likely due to an serialized event, which is larger than the buffer size.");
+ }
+
+ buffer.setSize(size);
+
+ fileChannel.read(buffer.getNioBuffer());
+
+ if (!isBuffer) {
+ buffer.tagAsEvent();
+ }
+
+ hasReachedEndOfFile = fileChannel.size() - fileChannel.position() == 0;
+ }
+ else {
+ buffer.recycle();
+ }
+ }
+
+ @Override
+ public void seekToPosition(long position) throws IOException {
+ fileChannel.position(position);
+ }
+
+ @Override
+ public boolean hasReachedEndOfFile() {
+ return hasReachedEndOfFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
index fd6c230..19a0fc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
@@ -42,4 +42,4 @@ public abstract class SynchronousFileIOChannel extends AbstractFileIOChannel {
this.fileChannel.close();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
new file mode 100644
index 0000000..5a31c3f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link ConnectionID} identifies a connection to a remote task manager by the socket address and
+ * a connection index. This allows multiple connections to the same task manager to be distinguished
+ * by their connection index.
+ *
+ * <p> The connection index is assigned by the {@link IntermediateResult} and ensures that it is
+ * safe to multiplex multiple data transfers over the same physical TCP connection.
+ */
+public class ConnectionID implements Serializable {
+
+ private final InetSocketAddress address;
+
+ private final int connectionIndex;
+
+ public ConnectionID(InstanceConnectionInfo connectionInfo, int connectionIndex) {
+ this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex);
+ }
+
+ public ConnectionID(InetSocketAddress address, int connectionIndex) {
+ this.address = checkNotNull(address);
+ checkArgument(connectionIndex >= 0);
+ this.connectionIndex = connectionIndex;
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ public int getConnectionIndex() {
+ return connectionIndex;
+ }
+
+ @Override
+ public int hashCode() {
+ return address.hashCode() + (31 * connectionIndex);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other.getClass() != ConnectionID.class) {
+ return false;
+ }
+
+ final ConnectionID ra = (ConnectionID) other;
+ if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return address + " [" + connectionIndex + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 76f8bbd..06dc151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -18,8 +18,9 @@
package org.apache.flink.runtime.io.network;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import java.io.IOException;
@@ -29,20 +30,20 @@ import java.io.IOException;
*/
public interface ConnectionManager {
- void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException;
+ void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException;
/**
- * Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}.
+ * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
*/
- PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException;
+ PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException;
/**
* Closes opened ChannelConnections in case of a resource release
- * @param remoteAddress
*/
- void closeOpenChannelConnections(RemoteAddress remoteAddress);
+ void closeOpenChannelConnections(ConnectionID connectionId);
int getNumberOfActiveConnections();
void shutdown() throws IOException;
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 447f6e6..af6273e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,8 +18,9 @@
package org.apache.flink.runtime.io.network;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import java.io.IOException;
@@ -30,16 +31,16 @@ import java.io.IOException;
public class LocalConnectionManager implements ConnectionManager {
@Override
- public void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
+ public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException {
}
@Override
- public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+ public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
return null;
}
@Override
- public void closeOpenChannelConnections(RemoteAddress remoteAddress) {}
+ public void closeOpenChannelConnections(ConnectionID connectionId) {}
@Override
public int getNumberOfActiveConnections() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 58b21e1..e02e744 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -21,13 +21,14 @@ package org.apache.flink.runtime.io.network;
import akka.actor.ActorRef;
import akka.util.Timeout;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
@@ -54,7 +55,7 @@ public class NetworkEnvironment {
private final FiniteDuration jobManagerTimeout;
- private final IntermediateResultPartitionManager partitionManager;
+ private final ResultPartitionManager partitionManager;
private final TaskEventDispatcher taskEventDispatcher;
@@ -62,6 +63,8 @@ public class NetworkEnvironment {
private final ConnectionManager connectionManager;
+ private final NetworkEnvironmentConfiguration configuration;
+
private boolean isShutdown;
/**
@@ -74,8 +77,9 @@ public class NetworkEnvironment {
this.jobManager = checkNotNull(jobManager);
this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
- this.partitionManager = new IntermediateResultPartitionManager();
+ this.partitionManager = new ResultPartitionManager();
this.taskEventDispatcher = new TaskEventDispatcher();
+ this.configuration = checkNotNull(config);
// --------------------------------------------------------------------
// Network buffers
@@ -95,7 +99,7 @@ public class NetworkEnvironment {
connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager();
try {
- connectionManager.start(partitionManager, taskEventDispatcher);
+ connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
}
catch (Throwable t) {
throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
@@ -115,30 +119,29 @@ public class NetworkEnvironment {
}
public void registerTask(Task task) throws IOException {
- final ExecutionAttemptID executionId = task.getExecutionId();
-
- final IntermediateResultPartition[] producedPartitions = task.getProducedPartitions();
- final BufferWriter[] writers = task.getWriters();
+ final ResultPartition[] producedPartitions = task.getProducedPartitions();
+ final ResultPartitionWriter[] writers = task.getWriters();
if (writers.length != producedPartitions.length) {
throw new IllegalStateException("Unequal number of writers and partitions.");
}
for (int i = 0; i < producedPartitions.length; i++) {
- final IntermediateResultPartition partition = producedPartitions[i];
- final BufferWriter writer = writers[i];
+ final ResultPartition partition = producedPartitions[i];
+ final ResultPartitionWriter writer = writers[i];
// Buffer pool for the partition
BufferPool bufferPool = null;
try {
- bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfQueues(), false);
- partition.setBufferPool(bufferPool);
+ bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
+ partition.registerBufferPool(bufferPool);
+
partitionManager.registerIntermediateResultPartition(partition);
}
catch (Throwable t) {
if (bufferPool != null) {
- bufferPool.destroy();
+ bufferPool.lazyDestroy();
}
if (t instanceof IOException) {
@@ -150,7 +153,7 @@ public class NetworkEnvironment {
}
// Register writer with task event dispatcher
- taskEventDispatcher.registerWriterForIncomingTaskEvents(executionId, writer.getPartitionId(), writer);
+ taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
}
// Setup the buffer pool for each buffer reader
@@ -165,7 +168,7 @@ public class NetworkEnvironment {
}
catch (Throwable t) {
if (bufferPool != null) {
- bufferPool.destroy();
+ bufferPool.lazyDestroy();
}
if (t instanceof IOException) {
@@ -185,10 +188,16 @@ public class NetworkEnvironment {
final ExecutionAttemptID executionId = task.getExecutionId();
if (task.isCanceledOrFailed()) {
- partitionManager.failIntermediateResultPartitions(executionId);
+ partitionManager.releasePartitionsProducedBy(executionId);
}
- taskEventDispatcher.unregisterWriters(executionId);
+ ResultPartitionWriter[] writers = task.getWriters();
+
+ if (writers != null) {
+ for (ResultPartitionWriter writer : task.getWriters()) {
+ taskEventDispatcher.unregisterWriter(writer);
+ }
+ }
final SingleInputGate[] inputGates = task.getInputGates();
@@ -206,7 +215,7 @@ public class NetworkEnvironment {
}
}
- public IntermediateResultPartitionManager getPartitionManager() {
+ public ResultPartitionManager getPartitionManager() {
return partitionManager;
}
@@ -222,6 +231,10 @@ public class NetworkEnvironment {
return networkBufferPool;
}
+ public IOMode getDefaultIOMode() {
+ return configuration.ioMode();
+ }
+
public boolean hasReleasedAllResources() {
String msg = String.format("Network buffer pool: %d missing memory segments. %d registered buffer pools. Connection manager: %d active connections. Task event dispatcher: %d registered writers.",
networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(), networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(), taskEventDispatcher.getNumberOfRegisteredWriters());
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
deleted file mode 100644
index 937055b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A {@link RemoteAddress} identifies a connection to a remote task manager by
- * the socket address and a connection index. This allows multiple connections
- * to be distinguished by their connection index.
- * <p>
- * The connection index is assigned by the {@link IntermediateResult} and
- * ensures that it is safe to multiplex multiple data transfers over the same
- * physical TCP connection.
- */
-public class RemoteAddress implements IOReadableWritable, Serializable {
-
- private InetSocketAddress address;
-
- private int connectionIndex;
-
- public RemoteAddress(InstanceConnectionInfo connectionInfo, int connectionIndex) {
- this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex);
- }
-
- public RemoteAddress(InetSocketAddress address, int connectionIndex) {
- this.address = checkNotNull(address);
- checkArgument(connectionIndex >= 0);
- this.connectionIndex = connectionIndex;
- }
-
- public InetSocketAddress getAddress() {
- return address;
- }
-
- public int getConnectionIndex() {
- return connectionIndex;
- }
-
- @Override
- public int hashCode() {
- return address.hashCode() + (31 * connectionIndex);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other.getClass() != RemoteAddress.class) {
- return false;
- }
-
- final RemoteAddress ra = (RemoteAddress) other;
- if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public String toString() {
- return address + " [" + connectionIndex + "]";
- }
-
- // ------------------------------------------------------------------------
- // Serialization
- // ------------------------------------------------------------------------
-
- public RemoteAddress() {
- this.address = null;
- this.connectionIndex = -1;
- }
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- final InetAddress ia = address.getAddress();
- out.writeInt(ia.getAddress().length);
- out.write(ia.getAddress());
- out.writeInt(address.getPort());
-
- out.writeInt(connectionIndex);
- }
-
- @Override
- public void read(final DataInputView in) throws IOException {
- final byte[] addressBytes = new byte[in.readInt()];
- in.readFully(addressBytes);
-
- final InetAddress ia = InetAddress.getByAddress(addressBytes);
- int port = in.readInt();
-
- address = new InetSocketAddress(ia, port);
- connectionIndex = in.readInt();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
index 7a529b9..845f72a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -18,59 +18,49 @@
package org.apache.flink.runtime.io.network;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.Table;
+import com.google.common.collect.Maps;
import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.event.EventListener;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
/**
- * The task event dispatcher dispatches events flowing backwards from a consumer
- * to a producer. It only supports programs, where the producer and consumer
- * are running at the same time.
- * <p>
- * The publish method is either called from the local input channel or the
- * network I/O thread.
+ * The task event dispatcher dispatches events flowing backwards from a consuming task to the task
+ * producing the consumed result.
+ *
+ * <p> Backwards events only work for tasks, which produce pipelined results, where both the
+ * producing and consuming task are running at the same time.
*/
public class TaskEventDispatcher {
- Table<ExecutionAttemptID, IntermediateResultPartitionID, BufferWriter> registeredWriters = HashBasedTable.create();
+ private final Map<ResultPartitionID, ResultPartitionWriter> registeredWriters = Maps.newHashMap();
- public void registerWriterForIncomingTaskEvents(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, BufferWriter listener) {
+ public void registerWriterForIncomingTaskEvents(ResultPartitionID partitionId, ResultPartitionWriter writer) {
synchronized (registeredWriters) {
- if (registeredWriters.put(executionId, partitionId, listener) != null) {
- throw new IllegalStateException("Event dispatcher already contains buffer writer.");
+ if (registeredWriters.put(partitionId, writer) != null) {
+ throw new IllegalStateException("Already registered at task event dispatcher.");
}
}
}
- public void unregisterWriters(ExecutionAttemptID executionId) {
+ public void unregisterWriter(ResultPartitionWriter writer) {
synchronized (registeredWriters) {
- List<IntermediateResultPartitionID> writersToUnregister = new ArrayList<IntermediateResultPartitionID>();
-
- for (IntermediateResultPartitionID partitionId : registeredWriters.row(executionId).keySet()) {
- writersToUnregister.add(partitionId);
- }
-
- for(IntermediateResultPartitionID partitionId : writersToUnregister) {
- registeredWriters.remove(executionId, partitionId);
- }
+ registeredWriters.remove(writer.getPartitionId());
}
}
/**
- * Publishes the event to the registered {@link EventListener} instance.
+ * Publishes the event to the registered {@link ResultPartitionWriter} instances.
* <p>
- * This method is either called from a local input channel or the network
- * I/O thread on behalf of a remote input channel.
+ * This method is either called directly from a {@link LocalInputChannel} or the network I/O
+ * thread on behalf of a {@link RemoteInputChannel}.
*/
- public boolean publish(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, TaskEvent event) {
- EventListener<TaskEvent> listener = registeredWriters.get(executionId, partitionId);
+ public boolean publish(ResultPartitionID partitionId, TaskEvent event) {
+ EventListener<TaskEvent> listener = registeredWriters.get(partitionId);
if (listener != null) {
listener.onEvent(event);
@@ -80,6 +70,9 @@ public class TaskEventDispatcher {
return false;
}
+ /**
+ * Returns the number of currently registered writers.
+ */
int getNumberOfRegisteredWriters() {
synchronized (registeredWriters) {
return registeredWriters.size();
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index e70b6ee..4ee7fad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.io.network.api.reader;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import java.io.IOException;
@@ -43,6 +43,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
private boolean isFinished;
+ @SuppressWarnings("unchecked")
protected AbstractRecordReader(InputGate inputGate) {
super(inputGate);