You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/11 11:48:48 UTC

[1/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 8e4c772ad -> c9cfe3ba9


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index d5c5760..78951d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -64,7 +64,7 @@ public class IOManagerITCase {
 	@Before
 	public void beforeTest() {
 		memoryManager = new DefaultMemoryManager(NUMBER_OF_SEGMENTS * SEGMENT_SIZE, 1);
-		ioManager = new IOManager();
+		ioManager = new IOManagerAsync();
 	}
 
 	@After
@@ -91,7 +91,7 @@ public class IOManagerITCase {
 		final Random rnd = new Random(SEED);
 		final AbstractInvokable memOwner = new DefaultMemoryManagerTest.DummyInvokable();
 		
-		Channel.ID[] ids = new Channel.ID[NUM_CHANNELS];
+		FileIOChannel.ID[] ids = new FileIOChannel.ID[NUM_CHANNELS];
 		BlockChannelWriter[] writers = new BlockChannelWriter[NUM_CHANNELS];
 		BlockChannelReader[] readers = new BlockChannelReader[NUM_CHANNELS];
 		ChannelWriterOutputView[] outs = new ChannelWriterOutputView[NUM_CHANNELS];

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index ed3a28a..9c129e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -41,7 +41,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -57,8 +57,8 @@ import org.junit.Test;
  *
  *
  */
-public class IOManagerPerformanceBenchmark
-{
+public class IOManagerPerformanceBenchmark {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
 	
 	private static final int[] SEGMENT_SIZES_ALIGNED = { 4096, 16384, 524288 };
@@ -80,10 +80,9 @@ public class IOManagerPerformanceBenchmark
 	
 	
 	@Before
-	public void startup()
-	{
+	public void startup() {
 		memManager = new DefaultMemoryManager(MEMORY_SIZE,1);
-		ioManager = new IOManager();
+		ioManager = new IOManagerAsync();
 	}
 	
 	@After
@@ -111,7 +110,7 @@ public class IOManagerPerformanceBenchmark
 	private void testChannelWithSegments(int numSegments) throws Exception
 	{
 		final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		BlockChannelWriter writer = null;
 		BlockChannelReader reader = null;
@@ -246,7 +245,7 @@ public class IOManagerPerformanceBenchmark
 	}
 		
 	private void speedTestStream(int bufferSize) throws IOException {
-		final Channel.ID tmpChannel = ioManager.createChannel();
+		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
 		final IntegerRecord rec = new IntegerRecord(0);
 		
 		File tempFile = null;
@@ -342,7 +341,7 @@ public class IOManagerPerformanceBenchmark
 	@SuppressWarnings("resource")
 	private void speedTestNIO(int bufferSize, boolean direct) throws IOException
 	{
-		final Channel.ID tmpChannel = ioManager.createChannel();
+		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
 		
 		File tempFile = null;
 		FileChannel fs = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index 810b0e2..fc6ea37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -26,10 +26,10 @@ import java.util.List;
 import org.junit.Assert;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelAccess;
+import org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.ReadRequest;
 import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
@@ -39,8 +39,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class IOManagerTest
-{
+public class IOManagerTest {
+	
 	// ------------------------------------------------------------------------
 	//                        Cross Test Fields
 	// ------------------------------------------------------------------------
@@ -54,15 +54,13 @@ public class IOManagerTest
 	// ------------------------------------------------------------------------
 	
 	@Before
-	public void beforeTest()
-	{
+	public void beforeTest() {
 		this.memoryManager = new DefaultMemoryManager(32 * 1024 * 1024, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		this.ioManager.shutdown();
 		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
 		
@@ -84,10 +82,10 @@ public class IOManagerTest
 	public void channelEnumerator() {
 		File tempPath = new File(System.getProperty("java.io.tmpdir")); 
 		
-		Channel.Enumerator enumerator = ioManager.createChannelEnumerator();
+		FileIOChannel.Enumerator enumerator = ioManager.createChannelEnumerator();
 
 		for (int i = 0; i < 10; i++) {
-			Channel.ID id = enumerator.next();
+			FileIOChannel.ID id = enumerator.next();
 			
 			File path = new File(id.getPath());
 			Assert.assertTrue("Channel IDs must name an absolute path.", path.isAbsolute());
@@ -99,12 +97,11 @@ public class IOManagerTest
 	// ------------------------------------------------------------------------
 	
 	@Test
-	public void channelReadWriteOneSegment()
-	{
+	public void channelReadWriteOneSegment() {
 		final int NUM_IOS = 1111;
 		
 		try {
-			final Channel.ID channelID = this.ioManager.createChannel();
+			final FileIOChannel.ID channelID = this.ioManager.createChannel();
 			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
 			
 			MemorySegment memSeg = this.memoryManager.allocatePages(new DummyInvokable(), 1).get(0);
@@ -143,14 +140,13 @@ public class IOManagerTest
 	}
 	
 	@Test
-	public void channelReadWriteMultipleSegments()
-	{
+	public void channelReadWriteMultipleSegments() {
 		final int NUM_IOS = 1111;
 		final int NUM_SEGS = 16;
 		
 		try {
 			final List<MemorySegment> memSegs = this.memoryManager.allocatePages(new DummyInvokable(), NUM_SEGS);
-			final Channel.ID channelID = this.ioManager.createChannel();
+			final FileIOChannel.ID channelID = this.ioManager.createChannel();
 			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
 			
 			for (int i = 0; i < NUM_IOS; i++) {
@@ -202,29 +198,26 @@ public class IOManagerTest
 
 	// ============================================================================================
 	
-	final class FailingSegmentReadRequest implements ReadRequest
-	{
-		private final BlockChannelAccess<ReadRequest, ?> channel;
+	final class FailingSegmentReadRequest implements ReadRequest {
+		
+		private final AsynchronousFileIOChannel<ReadRequest> channel;
 		
 		private final MemorySegment segment;
 		
-		protected FailingSegmentReadRequest(BlockChannelAccess<ReadRequest, ?> targetChannel, MemorySegment segment)
-		{
+		protected FailingSegmentReadRequest(AsynchronousFileIOChannel<ReadRequest> targetChannel, MemorySegment segment) {
 			this.channel = targetChannel;
 			this.segment = segment;
 		}
 
 
 		@Override
-		public void read() throws IOException
-		{
+		public void read() throws IOException {
 			throw new TestIOException();
 		}
 
 
 		@Override
-		public void requestDone(IOException ioex)
-		{
+		public void requestDone(IOException ioex) {
 			this.channel.handleProcessedBuffer(this.segment, ioex);
 		}
 	}
@@ -234,36 +227,30 @@ public class IOManagerTest
 	/**
 	 * Special write request that writes an entire memory segment to the block writer.
 	 */
-	final class FailingSegmentWriteRequest implements WriteRequest
-	{
-		private final BlockChannelAccess<WriteRequest, ?> channel;
+	final class FailingSegmentWriteRequest implements WriteRequest {
+		
+		private final AsynchronousFileIOChannel<WriteRequest> channel;
 		
 		private final MemorySegment segment;
 		
-		protected FailingSegmentWriteRequest(BlockChannelAccess<WriteRequest, ?> targetChannel, MemorySegment segment)
-		{
+		protected FailingSegmentWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) {
 			this.channel = targetChannel;
 			this.segment = segment;
 		}
 
-
 		@Override
-		public void write() throws IOException
-		{
+		public void write() throws IOException {
 			throw new TestIOException();
 		}
 
-
 		@Override
-		public void requestDone(IOException ioex)
-		{
+		public void requestDone(IOException ioex) {
 			this.channel.handleProcessedBuffer(this.segment, ioex);
 		}
 	}
 	
 	
-	final class TestIOException extends IOException
-	{
+	final class TestIOException extends IOException {
 		private static final long serialVersionUID = -814705441998024472L;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
index 2f30eed..bca6896 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
@@ -16,13 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
-
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -32,8 +30,9 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.Record;
 import org.junit.Test;
 
-public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>>
-{
+@SuppressWarnings("deprecation")
+public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>> {
+	
 	private static final long CROSS_MEM = 1024 * 1024;
 
 	private final double cross_frac;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index 6e66e72..1e0e882 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -32,8 +32,9 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
-{
+@SuppressWarnings("deprecation")
+public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+	
 	private static final long HASH_MEM = 4*1024*1024;
 	
 	private static final long SORT_MEM = 3*1024*1024;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index a824f77..584bc02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -40,8 +40,9 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
-{
+@SuppressWarnings("deprecation")
+public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+	
 	private static final long HASH_MEM = 6*1024*1024;
 	
 	private static final long SORT_MEM = 3*1024*1024;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
index 6dfe82b..21e686d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -61,7 +62,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "deprecation"})
 public class HashMatchIteratorITCase {
 	
 	private static final int MEMORY_SIZE = 16000000;		// total memory
@@ -104,12 +105,11 @@ public class HashMatchIteratorITCase {
 		this.recordPairPairComparator = new RecordIntPairPairComparator();
 		
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		if (this.ioManager != null) {
 			this.ioManager.shutdown();
 			if (!this.ioManager.isProperlyShutDown()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 921151a..2cc9892 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
@@ -96,7 +97,7 @@ public class HashTableITCase {
 		this.pairComparator = new IntPairPairComparator();
 		
 		this.memManager = new DefaultMemoryManager(32 * 1024 * 1024,1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 	
 	@After
@@ -756,7 +757,7 @@ public class HashTableITCase {
 		}
 		
 		// create the I/O access for spilling
-		final IOManager ioManager = new IOManager();
+		final IOManager ioManager = new IOManagerAsync();
 		
 		// ----------------------------------------------------------------------------------------
 		
@@ -857,7 +858,7 @@ public class HashTableITCase {
 		}
 		
 		// create the I/O access for spilling
-		IOManager ioManager = new IOManager();
+		IOManager ioManager = new IOManagerAsync();
 		
 		// create the map for validating the results
 		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
@@ -972,7 +973,7 @@ public class HashTableITCase {
 		}
 		
 		// create the I/O access for spilling
-		IOManager ioManager = new IOManager();
+		IOManager ioManager = new IOManagerAsync();
 		
 		// create the map for validating the results
 		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index 0fbe98a..a8941a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
 import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
@@ -55,7 +56,7 @@ public class HashTablePerformanceComparison {
 	
 	private final TypePairComparator<IntPair, IntPair> pairComparator = new IntPairPairComparator();
 	
-	private IOManager ioManager = new IOManager();
+	private IOManager ioManager = new IOManagerAsync();
 	
 	@Test
 	public void testCompactingHashMapPerformance() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
index c1a906c..04d1a38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
@@ -68,7 +69,7 @@ import org.junit.Test;
  * Test specialized hash join that keeps the build side data (in memory and on hard disk)
  * This is used for iterative tasks.
  */
-
+@SuppressWarnings("deprecation")
 public class ReOpenableHashTableITCase {
 	
 	private static final int PAGE_SIZE = 8 * 1024;
@@ -120,7 +121,7 @@ public class ReOpenableHashTableITCase {
 		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
 		
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index 2bc11f1..4db520e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -56,7 +57,7 @@ public class SpillingResettableIteratorTest {
 	public void startup() {
 		// set up IO and memory manager
 		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 32 * 1024);
-		this.ioman = new IOManager();
+		this.ioman = new IOManagerAsync();
 
 		// create test objects
 		ArrayList<IntValue> objects = new ArrayList<IntValue>(NUM_TESTRECORDS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
index 9eaae9a..10d3534 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
@@ -25,6 +25,7 @@ import org.junit.Assert;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -57,7 +58,7 @@ public class SpillingResettableMutableObjectIteratorTest {
 	public void startup() {
 		// set up IO and memory manager
 		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 32 * 1024);
-		this.ioman = new IOManager();
+		this.ioman = new IOManagerAsync();
 
 		// create test objects
 		final ArrayList<Record> objects = new ArrayList<Record>(NUM_TESTRECORDS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index cbceb8b..852a8f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -80,7 +81,7 @@ public class CombiningUnilateralSortMergerITCase {
 	@Before
 	public void beforeTest() {
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 		
 		this.serializerFactory = RecordSerializerFactory.get();
 		this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 8e4bbc0..a340ef2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -81,7 +82,7 @@ public class ExternalSortITCase {
 	@Before
 	public void beforeTest() {
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 		
 		this.pactRecordSerializer = RecordSerializerFactory.get();
 		this.pactRecordComparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 257eb87..9dec847 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
@@ -85,7 +86,7 @@ public class MassiveStringSortingITCase {
 			
 			try {
 				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManager();
+				IOManager ioMan = new IOManagerAsync();
 					
 				TypeSerializer<String> serializer = StringSerializer.INSTANCE;
 				TypeComparator<String> comparator = new StringComparator(true);
@@ -175,7 +176,7 @@ public class MassiveStringSortingITCase {
 			
 			try {
 				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManager();
+				IOManager ioMan = new IOManagerAsync();
 					
 				TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) (TupleTypeInfo<?>) TypeInfoParser.parse("Tuple2<String, String[]>");
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
index 766d66c..e12c4ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -53,7 +54,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-
+@SuppressWarnings("deprecation")
 public class SortMergeMatchIteratorITCase {
 	
 	// total memory
@@ -85,8 +86,7 @@ public class SortMergeMatchIteratorITCase {
 
 	@SuppressWarnings("unchecked")
 	@Before
-	public void beforeTest()
-	{
+	public void beforeTest() {
 		this.serializer1 = RecordSerializer.get();
 		this.serializer2 = RecordSerializer.get();
 		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
@@ -94,12 +94,11 @@ public class SortMergeMatchIteratorITCase {
 		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
 		
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		if (this.ioManager != null) {
 			this.ioManager.shutdown();
 			if (!this.ioManager.isProperlyShutDown()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 4d04bf4..02206f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -92,7 +93,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		
 		this.perSortMem = perSortMemory;
 		this.perSortFractionMem = (double)perSortMemory/totalMem;
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
 		
 		this.inputs = new ArrayList<MutableObjectIterator<Record>>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 5e0edc4..60a81c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.Buffer;
 import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
@@ -86,7 +87,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 		this.outputs = new LinkedList<OutputGate>();
 
 		this.memManager = new DefaultMemoryManager(memorySize, 1);
-		this.ioManager = new IOManager(System.getProperty("java.io.tmpdir"));
+		this.ioManager = new IOManagerAsync();
 		this.inputSplitProvider = inputSplitProvider;
 		this.mockBuffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, null);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 744651d..ba38776 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -47,7 +48,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-
+@SuppressWarnings("deprecation")
 public class HashVsSortMiniBenchmark {
 	
 	// total memory
@@ -94,7 +95,7 @@ public class HashVsSortMiniBenchmark {
 		this.pairComparator11 = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
 		
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, PAGE_SIZE);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After


[2/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.

Posted by uc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 388773d..f77d9c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -32,194 +30,71 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * The facade for the provided I/O manager services.
  */
-public class IOManager implements UncaughtExceptionHandler {
+public abstract class IOManager {
 	
 	/** Logging */
-	private static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
+	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
-	/**
-	 * The default temp paths for anonymous Channels.
-	 */
+	/** The temporary directories for files */
 	private final String[] paths;
 
-	/**
-	 * A random number generator for the anonymous ChannelIDs.
-	 */
+	/** A random number generator for the anonymous ChannelIDs. */
 	private final Random random;
-
-	/**
-	 * The writer thread used for asynchronous block oriented channel writing.
-	 */
-	private final WriterThread[] writers;
-
-	/**
-	 * The reader threads used for asynchronous block oriented channel reading.
-	 */
-	private final ReaderThread[] readers;
 	
-	/**
-	 * The number of the next path to use.
-	 */
+	/** The number of the next path to use. */
 	private volatile int nextPath;
-
-	/**
-	 * A boolean flag indicating whether the close() has already been invoked.
-	 */
-	private volatile boolean isClosed = false;
-
 	
 	// -------------------------------------------------------------------------
 	//               Constructors / Destructors
 	// -------------------------------------------------------------------------
 
 	/**
-	 * Constructs a new IOManager, writing channels to the system directory.
-	 */
-	public IOManager() {
-		this(System.getProperty("java.io.tmpdir"));
-	}
-	
-	/**
-	 * Constructs a new IOManager.
-	 * 
-	 * @param tempDir The base directory path for files underlying channels.
-	 */
-	public IOManager(String tempDir) {
-		this(new String[] {tempDir});
-	}
-
-	/**
 	 * Constructs a new IOManager.
 	 * 
 	 * @param paths
 	 *        the basic directory paths for files underlying anonymous channels.
 	 */
-	public IOManager(String[] paths) {
+	protected IOManager(String[] paths) {
 		this.paths = paths;
 		this.random = new Random();
 		this.nextPath = 0;
-		
-		// start a write worker thread for each directory
-		this.writers = new WriterThread[paths.length];
-		for (int i = 0; i < this.writers.length; i++) {
-			final WriterThread t = new WriterThread();
-			this.writers[i] = t;
-			t.setName("IOManager writer thread #" + (i + 1));
-			t.setDaemon(true);
-			t.setUncaughtExceptionHandler(this);
-			t.start();
-		}
-
-		// start a reader worker thread for each directory
-		this.readers = new ReaderThread[paths.length];
-		for (int i = 0; i < this.readers.length; i++) {
-			final ReaderThread t = new ReaderThread();
-			this.readers[i] = t;
-			t.setName("IOManager reader thread #" + (i + 1));
-			t.setDaemon(true);
-			t.setUncaughtExceptionHandler(this);
-			t.start();
-		}
 	}
 
 	/**
-	 * Close method. Shuts down the reader and writer threads immediately, not waiting for their
-	 * pending requests to be served. This method waits until the threads have actually ceased their
-	 * operation.
+	 * Close method, marks the I/O manager as closed.
 	 */
-	public synchronized final void shutdown()
-	{
-		if (!this.isClosed) {
-			this.isClosed = true;
-
-			// close writing and reading threads with best effort and log problems
-			
-			// --------------------------------- writer shutdown ----------------------------------			
-			for (int i = 0; i < this.readers.length; i++) {
-				try {
-					this.writers[i].shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager writer thread.", t);
-				}
-			}
-
-			// --------------------------------- reader shutdown ----------------------------------
-			for (int i = 0; i < this.readers.length; i++) {
-				try {
-					this.readers[i].shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager reader thread.", t);
-				}
-			}
-			
-			// ------------------------ wait until shutdown is complete ---------------------------
-			try {
-				for (int i = 0; i < this.readers.length; i++) {
-					this.writers[i].join();
-				}
-				for (int i = 0; i < this.readers.length; i++) {
-					this.readers[i].join();
-				}
-			}
-			catch (InterruptedException iex) {}
-		}
-	}
+	public abstract void shutdown();
 	
 	/**
-	 * Utility method to check whether the IO manager has been properly shut down. The IO manager is considered
-	 * to be properly shut down when it is closed and its threads have ceased operation.
+	 * Utility method to check whether the IO manager has been properly shut down.
 	 * 
 	 * @return True, if the IO manager has properly shut down, false otherwise.
 	 */
-	public final boolean isProperlyShutDown()
-	{
-		boolean readersShutDown = true;
-		for (int i = 0; i < this.readers.length; i++) {
-			readersShutDown &= this.readers[i].getState() == Thread.State.TERMINATED;
-		}
-		
-		boolean writersShutDown = true;
-		for (int i = 0; i < this.writers.length; i++) {
-			readersShutDown &= this.writers[i].getState() == Thread.State.TERMINATED;
-		}
-		
-		return this.isClosed && writersShutDown && readersShutDown;
-	}
-
-
-	@Override
-	public void uncaughtException(Thread t, Throwable e) {
-		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
-		shutdown();	
-	}
+	public abstract boolean isProperlyShutDown();
 
 	// ------------------------------------------------------------------------
 	//                          Channel Instantiations
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Creates a new {@link Channel.ID} in one of the temp directories. Multiple
+	 * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple
 	 * invocations of this method spread the channels evenly across the different directories.
 	 * 
 	 * @return A channel to a temporary directory.
 	 */
-	public Channel.ID createChannel()
-	{
+	public FileIOChannel.ID createChannel() {
 		final int num = getNextPathNum();
-		return new Channel.ID(this.paths[num], num, this.random);
+		return new FileIOChannel.ID(this.paths[num], num, this.random);
 	}
 
 	/**
-	 * Creates a new {@link Channel.Enumerator}, spreading the channels in a round-robin fashion
+	 * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin fashion
 	 * across the temporary file directories.
 	 * 
 	 * @return An enumerator for channels.
 	 */
-	public Channel.Enumerator createChannelEnumerator()
-	{
-		return new Channel.Enumerator(this.paths, this.random);
+	public FileIOChannel.Enumerator createChannelEnumerator() {
+		return new FileIOChannel.Enumerator(this.paths, this.random);
 	}
 
 	
@@ -228,199 +103,65 @@ public class IOManager implements UncaughtExceptionHandler {
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
-	 * accepting write request, carrying them out at some time and returning the written segment to the given queue
-	 * afterwards.
+	 * Creates a block channel writer that writes to the given channel. The writer adds the
+	 * written segment to its return-queue afterwards (to allow for asynchronous implementations).
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
-	 * @param returnQueue The queue to put the written buffers into.
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID,
-								LinkedBlockingQueue<MemorySegment> returnQueue)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue, 1);
+	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
+		return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
 	
 	/**
-	 * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
-	 * accepting write request, carrying them out at some time and returning the written segment to the given queue
-	 * afterwards.
-	 * <p>
-	 * The writer will collect a specified number of write requests and carry them out
-	 * in one, effectively writing one block in the size of multiple memory pages.
-	 * Note that this means that no memory segment will reach the return queue before
-	 * the given number of requests are collected, so the number of buffers used with
-	 * the writer should be greater than the number of requests to combine. Ideally,
-	 * the number of memory segments used is a multiple of the number of requests to
-	 * combine.
+	 * Creates a block channel writer that writes to the given channel. The writer adds the
+	 * written segment to the given queue (to allow for asynchronous implementations).
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param returnQueue The queue to put the written buffers into.
-	 * @param numRequestsToCombine The number of write requests to combine to one I/O request.
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID,
-								LinkedBlockingQueue<MemorySegment> returnQueue, int numRequestsToCombine)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue, numRequestsToCombine);
-	}
+	public abstract BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+				LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 	
 	/**
-	 * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
-	 * accepting write request, carrying them out at some time and returning the written segment its return queue afterwards.
+	 * Creates a block channel writer that writes to the given channel. The writer calls the given callback
+	 * after the I/O operation has been performed (successfully or unsuccessfully), to allow
+	 * for asynchronous implementations.
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
+	 * @param callback The callback to be called for 
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), 1);
-	}
+	public abstract BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException;
 	
 	/**
-	 * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
-	 * accepting write request, carrying them out at some time and returning the written segment its return queue afterwards.
-	 * <p>
-	 * The writer will collect a specified number of write requests and carry them out
-	 * in one, effectively writing one block in the size of multiple memory pages.
-	 * Note that this means that no memory segment will reach the return queue before
-	 * the given number of requests are collected, so the number of buffers used with
-	 * the writer should be greater than the number of requests to combine. Ideally,
-	 * the number of memory segments used is a multiple of the number of requests to
-	 * combine.
+	 * Creates a block channel reader that reads blocks from the given channel. The reader pushed
+	 * full memory segments (with the read data) to its "return queue", to allow for asynchronous read
+	 * implementations.
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
-	 * @param numRequestsToCombine The number of write requests to combine to one I/O request.
-	 * @return A block channel writer that writes to the given channel.
-	 * @throws IOException Thrown, if the channel for the writer could not be opened.
-	 */
-	public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID, int numRequestsToCombine)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), numRequestsToCombine);
-	}
-	
-	/**
-	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
-	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
-	 * is pushed to the given queue.
-	 * 
-	 * @param channelID The descriptor for the channel to write to.
-	 * @param returnQueue The queue to put the full buffers into.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public BlockChannelReader createBlockChannelReader(Channel.ID channelID,
-										LinkedBlockingQueue<MemorySegment> returnQueue)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue, 1);
+	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
+		return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
 	
 	/**
-	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
-	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
-	 * is pushed to the given queue.
-	 * <p>
-	 * The reader will collect a specified number of read requests and carry them out
-	 * in one, effectively reading one block in the size of multiple memory pages.
-	 * Note that this means that no memory segment will reach the return queue before
-	 * the given number of requests are collected, so the number of buffers used with
-	 * the reader should be greater than the number of requests to combine. Ideally,
-	 * the number of memory segments used is a multiple of the number of requests to
-	 * combine.
+	 * Creates a block channel reader that reads blocks from the given channel. The reader pushes the full segments
+	 * to the given queue, to allow for asynchronous implementations.
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param returnQueue The queue to put the full buffers into.
-	 * @param numRequestsToCombine The number of read requests to combine to one I/O request.
-	 * @return A block channel reader that reads from the given channel.
-	 * @throws IOException Thrown, if the channel for the reader could not be opened.
-	 */
-	public BlockChannelReader createBlockChannelReader(Channel.ID channelID,
-					LinkedBlockingQueue<MemorySegment> returnQueue, int numRequestsToCombine)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue, numRequestsToCombine);
-	}
-	
-	/**
-	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
-	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
-	 * is pushed to the reader's return queue.
-	 * 
-	 * @param channelID The descriptor for the channel to write to.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public BlockChannelReader createBlockChannelReader(Channel.ID channelID)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), 1);
-	}
-	
-	/**
-	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
-	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
-	 * is pushed to the reader's return queue.
-	 * <p>
-	 * The reader will collect a specified number of read requests and carry them out
-	 * in one, effectively reading one block in the size of multiple memory pages.
-	 * Note that this means that no memory segment will reach the return queue before
-	 * the given number of requests are collected, so the number of buffers used with
-	 * the reader should be greater than the number of requests to combine. Ideally,
-	 * the number of memory segments used is a multiple of the number of requests to
-	 * combine.
-	 * 
-	 * @param channelID The descriptor for the channel to write to.
-	 * @param numRequestsToCombine The number of write requests to combine to one I/O request.
-	 * @return A block channel reader that reads from the given channel.
-	 * @throws IOException Thrown, if the channel for the reader could not be opened.
-	 */
-	public BlockChannelReader createBlockChannelReader(Channel.ID channelID, int numRequestsToCombine)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, 
-			new LinkedBlockingQueue<MemorySegment>(), numRequestsToCombine);
-	}
+	public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, 
+										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 	
 	/**
 	 * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
@@ -437,217 +178,17 @@ public class IOManager implements UncaughtExceptionHandler {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public BulkBlockChannelReader createBulkBlockChannelReader(Channel.ID channelID,
-			List<MemorySegment> targetSegments,	int numBlocks)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BulkBlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
-	}
+	public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, 
+			List<MemorySegment> targetSegments, int numBlocks) throws IOException;
 	
-	// ========================================================================
-	//                             Utilities
-	// ========================================================================
+	// ------------------------------------------------------------------------
+	//                          Utilities
+	// ------------------------------------------------------------------------
 	
-	private final int getNextPathNum()
-	{
+	protected int getNextPathNum() {
 		final int next = this.nextPath;
 		final int newNext = next + 1;
 		this.nextPath = newNext >= this.paths.length ? 0 : newNext;
 		return next;
 	}
-	
-	
-	// ========================================================================
-	//                          I/O Worker Threads
-	// ========================================================================
-
-	/**
-	 * A worker thread for asynchronous read.
-	 * 
-	 */
-	private static final class ReaderThread extends Thread
-	{
-		protected final RequestQueue<ReadRequest> requestQueue;
-
-		private volatile boolean alive;
-
-		// ---------------------------------------------------------------------
-		// Constructors / Destructors
-		// ---------------------------------------------------------------------
-		
-		protected ReaderThread()
-		{
-			this.requestQueue = new RequestQueue<ReadRequest>();
-			this.alive = true;
-		}
-		
-		/**
-		 * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
-		 * immediately. All buffers of pending requests are handed back to their channel readers and an exception is
-		 * reported to them, declaring their request queue as closed.
-		 */
-		protected void shutdown()
-		{
-			if (this.alive) {
-				// shut down the thread
-				try {
-					this.alive = false;
-					this.requestQueue.close();
-					this.interrupt();
-				}
-				catch (Throwable t) {}
-			}
-			
-			// notify all pending write requests that the thread has been shut down
-			IOException ioex = new IOException("IO-Manager has been closed.");
-			
-			while (!this.requestQueue.isEmpty()) {
-				ReadRequest request = this.requestQueue.poll();
-				request.requestDone(ioex);
-			}
-		}
-
-		// ---------------------------------------------------------------------
-		//                             Main loop
-		// ---------------------------------------------------------------------
-
-		@Override
-		public void run()
-		{
-			while (this.alive)
-			{
-				
-				// get the next buffer. ignore interrupts that are not due to a shutdown.
-				ReadRequest request = null;
-				while (request == null) {
-					try {
-						request = this.requestQueue.take();
-					}
-					catch (InterruptedException iex) {
-						if (!this.alive) {
-							// exit
-							return;
-						}
-					}
-				}
-				
-				// remember any IO exception that occurs, so it can be reported to the writer
-				IOException ioex = null;
-
-				try {
-					// read buffer from the specified channel
-					request.read();
-				}
-				catch (IOException e) {
-					ioex = e;
-				}
-				catch (Throwable t) {
-					ioex = new IOException("The buffer could not be read: " + t.getMessage(), t);
-					IOManager.LOG.error("I/O reading thread encountered an error" + 
-						t.getMessage() == null ? "." : ": ", t);
-				}
-
-				// invoke the processed buffer handler of the request issuing reader object
-				request.requestDone(ioex);
-			} // end while alive
-		}
-		
-	} // end reading thread
-	
-	/**
-	 * A worker thread that asynchronously writes the buffers to disk.
-	 */
-	private static final class WriterThread extends Thread
-	{
-		protected final RequestQueue<WriteRequest> requestQueue;
-
-		private volatile boolean alive;
-
-		// ---------------------------------------------------------------------
-		// Constructors / Destructors
-		// ---------------------------------------------------------------------
-
-		protected WriterThread()
-		{
-			this.requestQueue = new RequestQueue<WriteRequest>();
-			this.alive = true;
-		}
-
-		/**
-		 * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
-		 * immediately. All buffers of pending requests are handed back to their channel writers and an exception is
-		 * reported to them, declaring their request queue as closed.
-		 */
-		protected void shutdown()
-		{
-			if (this.alive) {
-				// shut down the thread
-				try {
-					this.alive = false;
-					this.requestQueue.close();
-					this.interrupt();
-				}
-				catch (Throwable t) {}
-				
-				// notify all pending write requests that the thread has been shut down
-				IOException ioex = new IOException("Writer thread has been closed.");
-				
-				while (!this.requestQueue.isEmpty())
-				{
-					WriteRequest request = this.requestQueue.poll();
-					request.requestDone(ioex);
-				}
-			}
-		}
-
-		// ---------------------------------------------------------------------
-		// Main loop
-		// ---------------------------------------------------------------------
-
-		@Override
-		public void run()
-		{
-			while (this.alive) {
-				
-				WriteRequest request = null;
-				
-				// get the next buffer. ignore interrupts that are not due to a shutdown.
-				while (request == null) {
-					try {
-						request = requestQueue.take();
-					}
-					catch (InterruptedException iex) {
-						if (!this.alive) {
-							// exit
-							return;
-						}
-					}
-				}
-				
-				// remember any IO exception that occurs, so it can be reported to the writer
-				IOException ioex = null;
-				
-				try {
-					// write buffer to the specified channel
-					request.write();
-				}
-				catch (IOException e) {
-					ioex = e;
-				}
-				catch (Throwable t) {
-					ioex = new IOException("The buffer could not be written: " + t.getMessage(), t);
-					IOManager.LOG.error("I/O reading thread encountered an error" + 
-						t.getMessage() == null ? "." : ": ", t);
-				}
-
-				// invoke the processed buffer handler of the request issuing writer object
-				request.requestDone(ioex);
-			} // end while alive
-		}
-		
-	}; // end writer thread
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
new file mode 100644
index 0000000..1f79067
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A version of the {@link IOManager} that uses asynchronous I/O.
+ */
+public class IOManagerAsync extends IOManager implements UncaughtExceptionHandler {
+	
+	/** The writer threads used for asynchronous block oriented channel writing. */
+	private final WriterThread[] writers;
+
+	/** The reader threads used for asynchronous block oriented channel reading. */
+	private final ReaderThread[] readers;
+	
+	/** Lock object to guard shutdown */
+	private final Object shutdownLock = new Object();
+	
+	/** Flag to mark the I/O manager as alive or shut down */
+	private volatile boolean shutdown;
+	
+	// -------------------------------------------------------------------------
+	//               Constructors / Destructors
+	// -------------------------------------------------------------------------
+
+	/**
+	 * Constructs a new asynchronous I/O manger, writing files to the system 's temp directory.
+	 */
+	public IOManagerAsync() {
+		this(EnvironmentInformation.getTemporaryFileDirectory());
+	}
+	
+	/**
+	 * Constructs a new asynchronous I/O manger, writing file to the given directory.
+	 * 
+	 * @param tempDir The directory to write temporary files to.
+	 */
+	public IOManagerAsync(String tempDir) {
+		this(new String[] {tempDir});
+	}
+
+	/**
+	 * Constructs a new asynchronous I/O manger, writing file round robin across the given directories.
+	 * 
+	 * @param tempDirs The directories to write temporary files to.
+	 */
+	public IOManagerAsync(String[] tempDirs) {
+		super(tempDirs);
+		
+		// start a write worker thread for each directory
+		this.writers = new WriterThread[tempDirs.length];
+		for (int i = 0; i < this.writers.length; i++) {
+			final WriterThread t = new WriterThread();
+			this.writers[i] = t;
+			t.setName("IOManager writer thread #" + (i + 1));
+			t.setDaemon(true);
+			t.setUncaughtExceptionHandler(this);
+			t.start();
+		}
+
+		// start a reader worker thread for each directory
+		this.readers = new ReaderThread[tempDirs.length];
+		for (int i = 0; i < this.readers.length; i++) {
+			final ReaderThread t = new ReaderThread();
+			this.readers[i] = t;
+			t.setName("IOManager reader thread #" + (i + 1));
+			t.setDaemon(true);
+			t.setUncaughtExceptionHandler(this);
+			t.start();
+		}
+	}
+
+	/**
+	 * Close method. Shuts down the reader and writer threads immediately, not waiting for their
+	 * pending requests to be served. This method waits until the threads have actually ceased their
+	 * operation.
+	 */
+	@Override
+	public void shutdown() {
+		synchronized (shutdownLock) {
+			if (shutdown) {
+				return;
+			}
+			
+			shutdown = true;
+			
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Shutting down I/O manager.");
+			}
+
+			// close writing and reading threads with best effort and log problems
+			// first notify all to close, then wait until all are closed
+
+			for (WriterThread wt : writers) {
+				try {
+					wt.shutdown();
+				}
+				catch (Throwable t) {
+					LOG.error("Error while shutting down IO Manager writer thread.", t);
+				}
+			}
+			for (ReaderThread rt : readers) {
+				try {
+					rt.shutdown();
+				}
+				catch (Throwable t) {
+					LOG.error("Error while shutting down IO Manager reader thread.", t);
+				}
+			}
+			try {
+				for (WriterThread wt : writers) {
+					wt.join();
+				}
+				for (ReaderThread rt : readers) {
+					rt.join();
+				}
+			}
+			catch (InterruptedException iex) {}
+		}
+	}
+	
+	/**
+	 * Utility method to check whether the IO manager has been properly shut down. The IO manager is considered
+	 * to be properly shut down when it is closed and its threads have ceased operation.
+	 * 
+	 * @return True, if the IO manager has properly shut down, false otherwise.
+	 */
+	@Override
+	public boolean isProperlyShutDown() {
+		boolean readersShutDown = true;
+		for (ReaderThread rt : readers) {
+			readersShutDown &= rt.getState() == Thread.State.TERMINATED;
+		}
+		
+		boolean writersShutDown = true;
+		for (WriterThread wt : writers) {
+			readersShutDown &= wt.getState() == Thread.State.TERMINATED;
+		}
+		
+		return shutdown && writersShutDown && readersShutDown;
+	}
+
+
+	@Override
+	public void uncaughtException(Thread t, Throwable e) {
+		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
+		shutdown();	
+	}
+	
+	// ------------------------------------------------------------------------
+	//                        Reader / Writer instantiations
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+								LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
+	{
+		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
+	}
+	
+	@Override
+	public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException {
+		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback);
+	}
+	
+	/**
+	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
+	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
+	 * is pushed to the given queue.
+	 * 
+	 * @param channelID The descriptor for the channel to write to.
+	 * @param returnQueue The queue to put the full buffers into.
+	 * @return A block channel reader that reads from the given channel.
+	 * @throws IOException Thrown, if the channel for the reader could not be opened.
+	 */
+	@Override
+	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
+										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
+	{
+		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
+	}
+	
+	/**
+	 * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
+	 * The reader draws segments to read the blocks into from a supplied list, which must contain as many
+	 * segments as the channel has blocks. After the reader is done, the list with the full segments can be 
+	 * obtained from the reader.
+	 * <p>
+	 * If a channel is not to be read in one bulk, but in multiple smaller batches, a  
+	 * {@link BlockChannelReader} should be used.
+	 * 
+	 * @param channelID The descriptor for the channel to write to.
+	 * @param targetSegments The list to take the segments from into which to read the data.
+	 * @param numBlocks The number of blocks in the channel to read.
+	 * @return A block channel reader that reads from the given channel.
+	 * @throws IOException Thrown, if the channel for the reader could not be opened.
+	 */
+	@Override
+	public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
+			List<MemorySegment> targetSegments,	int numBlocks) throws IOException
+	{
+		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
+	}
+
+	// -------------------------------------------------------------------------
+	//                           I/O Worker Threads
+	// -------------------------------------------------------------------------
+	
+	/**
+	 * A worker thread for asynchronous reads.
+	 */
+	private static final class ReaderThread extends Thread {
+		
+		protected final RequestQueue<ReadRequest> requestQueue;
+
+		private volatile boolean alive;
+
+		// ---------------------------------------------------------------------
+		// Constructors / Destructors
+		// ---------------------------------------------------------------------
+		
+		protected ReaderThread() {
+			this.requestQueue = new RequestQueue<ReadRequest>();
+			this.alive = true;
+		}
+		
+		/**
+		 * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
+		 * immediately. All buffers of pending requests are handed back to their channel readers and an exception is
+		 * reported to them, declaring their request queue as closed.
+		 */
+		protected void shutdown() {
+			synchronized (this) {
+				if (alive) {
+					alive = false;
+					requestQueue.close();
+					interrupt();
+				}
+
+				try {
+					join(1000);
+				}
+				catch (InterruptedException e) {}
+				
+				// notify all pending write requests that the thread has been shut down
+				IOException ioex = new IOException("IO-Manager has been closed.");
+					
+				while (!this.requestQueue.isEmpty()) {
+					ReadRequest request = this.requestQueue.poll();
+					if (request != null) {
+						try {
+							request.requestDone(ioex);
+						}
+						catch (Throwable t) {
+							IOManagerAsync.LOG.error("The handler of the request complete callback threw an exception"
+									+ (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+						}
+					}
+				}
+			}
+		}
+
+		// ---------------------------------------------------------------------
+		//                             Main loop
+		// ---------------------------------------------------------------------
+
+		@Override
+		public void run() {
+			
+			while (alive) {
+				
+				// get the next buffer. ignore interrupts that are not due to a shutdown.
+				ReadRequest request = null;
+				while (alive && request == null) {
+					try {
+						request = this.requestQueue.take();
+					}
+					catch (InterruptedException e) {
+						if (!this.alive) {
+							return;
+						} else {
+							IOManagerAsync.LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
+						}
+					}
+				}
+				
+				// remember any IO exception that occurs, so it can be reported to the writer
+				IOException ioex = null;
+
+				try {
+					// read buffer from the specified channel
+					request.read();
+				}
+				catch (IOException e) {
+					ioex = e;
+				}
+				catch (Throwable t) {
+					ioex = new IOException("The buffer could not be read: " + t.getMessage(), t);
+					IOManagerAsync.LOG.error("I/O reading thread encountered an error" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+				}
+
+				// invoke the processed buffer handler of the request issuing reader object
+				try {
+					request.requestDone(ioex);
+				}
+				catch (Throwable t) {
+					IOManagerAsync.LOG.error("The handler of the request-complete-callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+				}
+			} // end while alive
+		}
+		
+	} // end reading thread
+	
+	/**
+	 * A worker thread that asynchronously writes the buffers to disk.
+	 */
+	private static final class WriterThread extends Thread {
+		
+		protected final RequestQueue<WriteRequest> requestQueue;
+
+		private volatile boolean alive;
+
+		// ---------------------------------------------------------------------
+		// Constructors / Destructors
+		// ---------------------------------------------------------------------
+
+		protected WriterThread() {
+			this.requestQueue = new RequestQueue<WriteRequest>();
+			this.alive = true;
+		}
+
+		/**
+		 * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
+		 * immediately. All buffers of pending requests are handed back to their channel writers and an exception is
+		 * reported to them, declaring their request queue as closed.
+		 */
+		protected void shutdown() {
+			synchronized (this) {
+				if (alive) {
+					alive = false;
+					requestQueue.close();
+					interrupt();
+				}
+
+				try {
+					join(1000);
+				}
+				catch (InterruptedException e) {}
+				
+				// notify all pending write requests that the thread has been shut down
+				IOException ioex = new IOException("IO-Manager has been closed.");
+					
+				while (!this.requestQueue.isEmpty()) {
+					WriteRequest request = this.requestQueue.poll();
+					if (request != null) {
+						try {
+							request.requestDone(ioex);
+						}
+						catch (Throwable t) {
+							IOManagerAsync.LOG.error("The handler of the request complete callback threw an exception"
+									+ (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+						}
+					}
+				}
+			}
+		}
+
+		// ---------------------------------------------------------------------
+		// Main loop
+		// ---------------------------------------------------------------------
+
+		@Override
+		public void run() {
+			
+			while (this.alive) {
+				
+				WriteRequest request = null;
+				
+				// get the next buffer. ignore interrupts that are not due to a shutdown.
+				while (alive && request == null) {
+					try {
+						request = requestQueue.take();
+					}
+					catch (InterruptedException e) {
+						if (!this.alive) {
+							return;
+						} else {
+							IOManagerAsync.LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
+						}
+					}
+				}
+				
+				// remember any IO exception that occurs, so it can be reported to the writer
+				IOException ioex = null;
+				
+				try {
+					// write buffer to the specified channel
+					request.write();
+				}
+				catch (IOException e) {
+					ioex = e;
+				}
+				catch (Throwable t) {
+					ioex = new IOException("The buffer could not be written: " + t.getMessage(), t);
+					IOManagerAsync.LOG.error("I/O writing thread encountered an error" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+				}
+
+				// invoke the processed buffer handler of the request issuing writer object
+				try {
+					request.requestDone(ioex);
+				}
+				catch (Throwable t) {
+					IOManagerAsync.LOG.error("The handler of the request-complete-callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+				}
+			} // end while alive
+		}
+		
+	}; // end writer thread
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
index d24e8c7..1b7adae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
@@ -16,23 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
 
-
 /**
  * Basic interface that I/O requests that are sent to the threads of the I/O manager need to implement.
- *
  */
-interface IORequest
-{
+interface IORequest {
+	
 	/**
 	 * Method that is called by the target I/O thread after the request has been processed.
 	 * 
-	 * @param ioex The exception that occurred while processing the I/O request. Is <tt>null</tt> if everything
-	 *             was fine.
+	 * @param ioex The exception that occurred while processing the I/O request. Is <tt>null</tt> if everything was fine.
 	 */
 	public void requestDone(IOException ioex);
 }
@@ -41,8 +37,8 @@ interface IORequest
 /**
  * Interface for I/O requests that are handled by the IOManager's reading thread. 
  */
-interface ReadRequest extends IORequest
-{
+interface ReadRequest extends IORequest {
+	
 	/**
 	 * Called by the target I/O thread to perform the actual reading operation.
 	 * 
@@ -55,8 +51,8 @@ interface ReadRequest extends IORequest
 /**
  * Interface for I/O requests that are handled by the IOManager's writing thread.
  */
-interface WriteRequest extends IORequest
-{
+interface WriteRequest extends IORequest {
+	
 	/**
 	 * Called by the target I/O thread to perform the actual writing operation.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
new file mode 100644
index 0000000..78699e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A {@link RequestDoneCallback} that adds the memory segments to a blocking queue.
+ */
+public class QueuingCallback implements RequestDoneCallback {
+
+	private final LinkedBlockingQueue<MemorySegment> queue;
+	
+	public QueuingCallback(LinkedBlockingQueue<MemorySegment> queue) {
+		this.queue = queue;
+	}
+
+	@Override
+	public void requestSuccessful(MemorySegment buffer) {
+		queue.add(buffer);
+	}
+
+	@Override
+	public void requestFailed(MemorySegment buffer, IOException e) {
+		// the I/O error is recorded in the writer already
+		queue.add(buffer);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
new file mode 100644
index 0000000..982343c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * Callback to be executed on completion of an asynchronous I/O request.
+ * Depending on success or failure, either method of
+ * {@ink #requestSuccessful(MemorySegment)} or {@link #requestFailed(MemorySegment, IOException)}
+ * is called.
+ */
+public interface RequestDoneCallback {
+
+	void requestSuccessful(MemorySegment buffer);
+	
+	void requestFailed(MemorySegment buffer, IOException e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
index b506380..230bf50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
@@ -16,29 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
-
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
 /**
  * A {@link LinkedBlockingQueue} that is extended with closing methods.
- *
  */
-public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Closeable
-{
-	/**
-	 * UID for serialization interoperability. 
-	 */
+public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Closeable {
+	
 	private static final long serialVersionUID = 3804115535778471680L;
 	
-	/**
-	 * Flag marking this queue as closed.
-	 */
+	/** Flag marking this queue as closed. */
 	private volatile boolean closed = false;
 	
 	/**
@@ -47,7 +37,7 @@ public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Clo
 	 * @see java.io.Closeable#close()
 	 */
 	@Override
-	public void close() throws IOException {
+	public void close() {
 		this.closed = true;
 	}
 	
@@ -56,9 +46,7 @@ public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Clo
 	 * 
 	 * @return True, if the queue is closed, false otherwise.
 	 */
-	public boolean isClosed()
-	{
+	public boolean isClosed() {
 		return this.closed;
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
new file mode 100644
index 0000000..fd6c230
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+/**
+ * A base class for synchronous readers and writers.
+ */
+public abstract class SynchronousFileIOChannel extends AbstractFileIOChannel {
+	
+	protected SynchronousFileIOChannel(FileIOChannel.ID channelID, boolean writeEnabled) throws IOException {
+		super(channelID, writeEnabled);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isClosed() {
+		return !this.fileChannel.isOpen();
+	}
+	
+	@Override
+	public void close() throws IOException {
+		if (this.fileChannel.isOpen()) {
+			this.fileChannel.close();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index b5504f3..a02e81c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -30,7 +30,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
 import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
@@ -49,7 +49,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 
 	private final IOManager ioManager;
 
-	private final Channel.Enumerator channelEnumerator;
+	private final FileIOChannel.Enumerator channelEnumerator;
 
 	private final int numSegmentsSpillingThreshold;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 020da9d..6d3194b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.SeekableDataInputView;
 import org.apache.flink.core.memory.SeekableDataOutputView;
 import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
@@ -282,7 +282,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	 * @return The number of buffers that were freed by spilling this partition.
 	 * @throws IOException Thrown, if the writing failed.
 	 */
-	public int spillPartition(List<MemorySegment> target, IOManager ioAccess, Channel.ID targetChannel,
+	public int spillPartition(List<MemorySegment> target, IOManager ioAccess, FileIOChannel.ID targetChannel,
 			LinkedBlockingQueue<MemorySegment> bufferReturnQueue)
 	throws IOException
 	{
@@ -311,7 +311,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		return this.buildSideWriteBuffer.spill(this.buildSideChannel);
 	}
 	
-	public void finalizeBuildPhase(IOManager ioAccess, Channel.Enumerator probeChannelEnumerator,
+	public void finalizeBuildPhase(IOManager ioAccess, FileIOChannel.Enumerator probeChannelEnumerator,
 			LinkedBlockingQueue<MemorySegment> bufferReturnQueue)
 	throws IOException
 	{
@@ -449,7 +449,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	//                   ReOpenableHashTable related methods
 	// --------------------------------------------------------------------------------------------------
 	
-	public void prepareProbePhase(IOManager ioAccess, Channel.Enumerator probeChannelEnumerator,
+	public void prepareProbePhase(IOManager ioAccess, FileIOChannel.Enumerator probeChannelEnumerator,
 			LinkedBlockingQueue<MemorySegment> bufferReturnQueue) throws IOException {
 		if (isInMemory()) {
 			return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 21bd8df..1bbf246 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -37,7 +37,7 @@ import org.apache.flink.core.memory.SeekableDataOutputView;
 import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -306,7 +306,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 * The channel enumerator that is used while processing the current partition to create
 	 * channels for the spill partitions it requires.
 	 */
-	protected Channel.Enumerator currentEnumerator;
+	protected FileIOChannel.Enumerator currentEnumerator;
 	
 	/**
 	 * The array of memory segments that contain the buckets which form the actual hash-table

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
index 3695aea..56dcfae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
@@ -28,14 +28,14 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
 import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 
 public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> {
 
 	protected int initialPartitionBuffersCount = -1; 						// stores the number of buffers used for an in-memory partition after the build phase has finished.
 
-	private Channel.ID initialBuildSideChannel = null;			// path to initial build side contents (only for in-memory partitions)
+	private FileIOChannel.ID initialBuildSideChannel = null;			// path to initial build side contents (only for in-memory partitions)
 	
 	private BlockChannelWriter initialBuildSideWriter = null;
 
@@ -97,7 +97,7 @@ public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> {
 	 * 
 	 * @return Number of memorySegments in the writeBehindBuffers!
 	 */
-	int spillInMemoryPartition(Channel.ID targetChannel, IOManager ioManager, LinkedBlockingQueue<MemorySegment> writeBehindBuffers) throws IOException {
+	int spillInMemoryPartition(FileIOChannel.ID targetChannel, IOManager ioManager, LinkedBlockingQueue<MemorySegment> writeBehindBuffers) throws IOException {
 		this.initialPartitionBuffersCount = partitionBuffers.length; // for ReOpenableHashMap
 		this.initialBuildSideChannel = targetChannel;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
index 5099f38..6819924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -36,7 +36,7 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
 	/**
 	 * Channel for the spilled partitions
 	 */
-	private final Channel.Enumerator spilledInMemoryPartitions;
+	private final FileIOChannel.Enumerator spilledInMemoryPartitions;
 	
 	/**
 	 * Stores the initial partitions and a list of the files that contain the spilled contents

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index e566e25..63e64c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
@@ -36,9 +35,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelAccess;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -267,7 +265,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
 			}
 			
-			final Channel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
+			final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
 			List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
 
 			
@@ -296,7 +294,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				}
 				
 				// open next channel
-				Channel.ID channel = enumerator.next();
+				FileIOChannel.ID channel = enumerator.next();
 				registerChannelToBeRemovedAtShudown(channel);
 				
 				if (LOG.isDebugEnabled()) {
@@ -304,8 +302,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				}
 
 				// create writer
-				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
-																channel, this.numWriteBuffersToCluster);
+				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
 				registerOpenChannelToBeRemovedAtShudown(writer);
 				final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
 																			this.memManager.getPageSize());
@@ -420,7 +417,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				
 				// get the readers and register them to be released
 				final MergeIterator<E> mergeIterator = getMergingIterator(
-						channelIDs, readBuffers, new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size()));
+						channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()));
 				
 				// set the target for the user iterator
 				// if the final merge combines, create a combining iterator around the merge iterator,
@@ -452,17 +449,16 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 		throws IOException
 		{
 			// the list with the readers, to be closed at shutdown
-			final List<BlockChannelAccess<?, ?>> channelAccesses = new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size());
+			final List<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size());
 
 			// the list with the target iterators
 			final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses);
 			final KeyGroupedIterator<E> groupedIter = new KeyGroupedIterator<E>(mergeIterator, this.serializer, this.comparator2);
 
 			// create a new channel writer
-			final Channel.ID mergedChannelID = this.ioManager.createChannel();
+			final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
 			registerChannelToBeRemovedAtShudown(mergedChannelID);
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
-															mergedChannelID, this.numWriteBuffersToCluster);
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
 			registerOpenChannelToBeRemovedAtShudown(writer);
 			final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, 
 																			this.memManager.getPageSize());
@@ -488,7 +484,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 			
 			// remove the merged channel readers from the clear-at-shutdown list
 			for (int i = 0; i < channelAccesses.size(); i++) {
-				BlockChannelAccess<?, ?> access = channelAccesses.get(i);
+				FileIOChannel access = channelAccesses.get(i);
 				access.closeAndDelete();
 				unregisterOpenChannelToBeRemovedAtShudown(access);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 3dd21f5..459ef82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -37,14 +37,13 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelAccess;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.Channel.ID;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -135,12 +134,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	/**
 	 * Collection of all currently open channels, to be closed and deleted during cleanup.
 	 */
-	private final HashSet<BlockChannelAccess<?, ?>> openChannels;
+	private final HashSet<FileIOChannel> openChannels;
 	
 	/**
 	 * Collection of all temporary files created and to be removed when closing the sorter.
 	 */
-	private final HashSet<Channel.ID> channelsToDeleteAtShutdown;
+	private final HashSet<FileIOChannel.ID> channelsToDeleteAtShutdown;
 	
 	/**
 	 * The monitor which guards the iterator field.
@@ -387,8 +386,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		};
 		
 		// create sets that track the channels we need to clean up when closing the sorter
-		this.channelsToDeleteAtShutdown = new HashSet<Channel.ID>(64);
-		this.openChannels = new HashSet<BlockChannelAccess<?,?>>(64);
+		this.channelsToDeleteAtShutdown = new HashSet<FileIOChannel.ID>(64);
+		this.openChannels = new HashSet<FileIOChannel>(64);
 
 		// start the thread that reads the input channels
 		this.readThread = getReadingThread(exceptionHandler, input, circularQueues, parentTask,
@@ -519,8 +518,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			// we have to loop this, because it may fail with a concurrent modification exception
 			while (!this.openChannels.isEmpty()) {
 				try {
-					for (Iterator<BlockChannelAccess<?, ?>> channels = this.openChannels.iterator(); channels.hasNext(); ) {
-						final BlockChannelAccess<?, ?> channel = channels.next();
+					for (Iterator<FileIOChannel> channels = this.openChannels.iterator(); channels.hasNext(); ) {
+						final FileIOChannel channel = channels.next();
 						channels.remove();
 						channel.closeAndDelete();
 					}
@@ -531,8 +530,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			// we have to loop this, because it may fail with a concurrent modification exception
 			while (!this.channelsToDeleteAtShutdown.isEmpty()) {
 				try {
-					for (Iterator<Channel.ID> channels = this.channelsToDeleteAtShutdown.iterator(); channels.hasNext(); ) {
-						final Channel.ID channel = channels.next();
+					for (Iterator<FileIOChannel.ID> channels = this.channelsToDeleteAtShutdown.iterator(); channels.hasNext(); ) {
+						final FileIOChannel.ID channel = channels.next();
 						channels.remove();
 						try {
 							final File f = new File(channel.getPath());
@@ -1257,7 +1256,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			
 			// ------------------- Spilling Phase ------------------------
 			
-			final Channel.Enumerator enumerator = this.ioManager.createChannelEnumerator();			
+			final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();			
 			List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
 
 			
@@ -1286,12 +1285,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				}
 				
 				// open next channel
-				Channel.ID channel = enumerator.next();
+				FileIOChannel.ID channel = enumerator.next();
 				registerChannelToBeRemovedAtShudown(channel);
 
 				// create writer
-				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
-																channel, this.numWriteBuffersToCluster);
+				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
 				registerOpenChannelToBeRemovedAtShudown(writer);
 				final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
 																			this.memManager.getPageSize());
@@ -1351,7 +1349,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				getSegmentsForReaders(readBuffers, this.sortReadMemory, channelIDs.size());
 				
 				// get the readers and register them to be released
-				setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size())));
+				setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size())));
 			}
 
 			// done
@@ -1405,7 +1403,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * @throws IOException Thrown, if the readers encounter an I/O problem.
 		 */
 		protected final MergeIterator<E> getMergingIterator(final List<ChannelWithBlockCount> channelIDs,
-				final List<List<MemorySegment>> inputSegments, List<BlockChannelAccess<?, ?>> readerList)
+				final List<List<MemorySegment>> inputSegments, List<FileIOChannel> readerList)
 			throws IOException
 		{
 			// create one iterator per channel id
@@ -1420,9 +1418,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				final List<MemorySegment> segsForChannel = inputSegments.get(i);
 				
 				// create a reader. if there are multiple segments for the reader, issue multiple together per I/O request
-				final BlockChannelReader reader = segsForChannel.size() >= 4 ? 
-					this.ioManager.createBlockChannelReader(channel.getChannel(), segsForChannel.size() / 2) :
-					this.ioManager.createBlockChannelReader(channel.getChannel());
+				final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel.getChannel());
 					
 				readerList.add(reader);
 				registerOpenChannelToBeRemovedAtShudown(reader);
@@ -1495,16 +1491,15 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		throws IOException
 		{
 			// the list with the readers, to be closed at shutdown
-			final List<BlockChannelAccess<?, ?>> channelAccesses = new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size());
+			final List<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size());
 
 			// the list with the target iterators
 			final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses);
 
 			// create a new channel writer
-			final Channel.ID mergedChannelID = this.ioManager.createChannel();
+			final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
 			registerChannelToBeRemovedAtShudown(mergedChannelID);
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
-															mergedChannelID, this.numWriteBuffersToCluster);
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
 			registerOpenChannelToBeRemovedAtShudown(writer);
 			final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, 
 																			this.memManager.getPageSize());
@@ -1523,7 +1518,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			
 			// remove the merged channel readers from the clear-at-shutdown list
 			for (int i = 0; i < channelAccesses.size(); i++) {
-				BlockChannelAccess<?, ?> access = channelAccesses.get(i);
+				FileIOChannel access = channelAccesses.get(i);
 				access.closeAndDelete();
 				unregisterOpenChannelToBeRemovedAtShudown(access);
 			}
@@ -1577,7 +1572,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * 
 		 * @param channel The channel id.
 		 */
-		protected void registerChannelToBeRemovedAtShudown(Channel.ID channel) {
+		protected void registerChannelToBeRemovedAtShudown(FileIOChannel.ID channel) {
 			UnilateralSortMerger.this.channelsToDeleteAtShutdown.add(channel);
 		}
 
@@ -1586,7 +1581,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * 
 		 * @param channel The channel id.
 		 */
-		protected void unregisterChannelToBeRemovedAtShudown(Channel.ID channel) {
+		protected void unregisterChannelToBeRemovedAtShudown(FileIOChannel.ID channel) {
 			UnilateralSortMerger.this.channelsToDeleteAtShutdown.remove(channel);
 		}
 		
@@ -1595,7 +1590,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * 
 		 * @param channel The channel reader/writer.
 		 */
-		protected void registerOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
+		protected void registerOpenChannelToBeRemovedAtShudown(FileIOChannel channel) {
 			UnilateralSortMerger.this.openChannels.add(channel);
 		}
 
@@ -1604,7 +1599,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * 
 		 * @param channel The channel reader/writer.
 		 */
-		protected void unregisterOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
+		protected void unregisterOpenChannelToBeRemovedAtShudown(FileIOChannel channel) {
 			UnilateralSortMerger.this.openChannels.remove(channel);
 		}
 	}
@@ -1769,7 +1764,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	
 	protected static final class ChannelWithBlockCount
 	{
-		private final Channel.ID channel;
+		private final FileIOChannel.ID channel;
 		private final int blockCount;
 		
 		public ChannelWithBlockCount(ID channel, int blockCount) {
@@ -1777,7 +1772,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			this.blockCount = blockCount;
 		}
 
-		public Channel.ID getChannel() {
+		public FileIOChannel.ID getChannel() {
 			return channel;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 7220b23..93bece2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -75,6 +75,7 @@ import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.ChannelManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkConnectionManager;
@@ -354,7 +355,7 @@ public class TaskManager implements TaskOperationProtocol {
 						(blobServerAddress), GlobalConfiguration.getConfiguration());
 			}
 		}
-		this.ioManager = new IOManager(tmpDirPaths);
+		this.ioManager = new IOManagerAsync(tmpDirPaths);
 		
 		// start the heart beats
 		{

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 72fe3f1..535c756 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -27,6 +27,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 
+/**
+ * Utility class that gives access to the execution environment of the JVM, like
+ * the executing user, startup options, or the JVM version.
+ */
 public class EnvironmentInformation {
 
 	private static final Logger LOG = LoggerFactory.getLogger(EnvironmentInformation.class);
@@ -183,6 +187,10 @@ public class EnvironmentInformation {
 			return UNKNOWN;
 		}
 	}
+	
+	public static String getTemporaryFileDirectory() {
+		return System.getProperty("java.io.tmpdir");
+	}
 
 	public static void logEnvironmentInfo(Logger log, String componentName) {
 		if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 879bd37..b4070ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -55,7 +55,7 @@ public final class BlobKeyTest {
 	}
 
 	/**
-	 * Tests the serialization/deserialization of BLOB keys using the regular {@link IOReadableWritable} API.
+	 * Tests the serialization/deserialization of BLOB keys using the regular {@link org.apache.flink.core.io.IOReadableWritable} API.
 	 */
 	@Test
 	public void testSerialization() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index 2073061..c05fcca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -23,14 +23,14 @@ import java.io.EOFException;
 import java.util.List;
 
 import org.junit.Assert;
-
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -79,7 +79,7 @@ public class ChannelViewsTest
 	@Before
 	public void beforeTest() {
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
@@ -103,7 +103,7 @@ public class ChannelViewsTest
 	public void testWriteReadSmallRecords() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -147,7 +147,7 @@ public class ChannelViewsTest
 	public void testWriteAndReadLongRecords() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -188,7 +188,7 @@ public class ChannelViewsTest
 	public void testReadTooMany() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -239,7 +239,7 @@ public class ChannelViewsTest
 	public void testReadWithoutKnownBlockCount() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -283,7 +283,7 @@ public class ChannelViewsTest
 	public void testWriteReadOneBufferOnly() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, 1);
@@ -327,7 +327,7 @@ public class ChannelViewsTest
 	public void testWriteReadNotAll() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 629d1dc..22c40f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -23,11 +23,11 @@ import java.io.EOFException;
 import java.util.ArrayList;
 
 import org.junit.Assert;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.SpillingBuffer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
@@ -71,7 +71,7 @@ public class SpillingBufferTest {
 	@Before
 	public void beforeTest() {
 		memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		ioManager = new IOManager();
+		ioManager = new IOManagerAsync();
 	}
 
 	@After


[3/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.

Posted by uc...@apache.org.
[FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.

 - This change also allows for a very simple way of plugging in a synchronous version of the I/O manager.

This closes #193.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c9cfe3ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c9cfe3ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c9cfe3ba

Branch: refs/heads/master
Commit: c9cfe3ba9a2009c3da2cb8a39090154c30ccd88c
Parents: 8e4c772
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 7 18:45:19 2014 +0100
Committer: uce <uc...@apache.org>
Committed: Tue Nov 11 11:48:14 2014 +0100

----------------------------------------------------------------------
 .../io/disk/ChannelReaderInputViewIterator.java |   6 +-
 .../disk/iomanager/AbstractFileIOChannel.java   | 106 ++++
 .../disk/iomanager/AsynchronousBlockReader.java | 131 +++++
 .../disk/iomanager/AsynchronousBlockWriter.java |  88 +++
 .../AsynchronousBlockWriterWithCallback.java    |  67 +++
 .../iomanager/AsynchronousBulkBlockReader.java  | 107 ++++
 .../iomanager/AsynchronousFileIOChannel.java    | 264 +++++++++
 .../io/disk/iomanager/BlockChannelAccess.java   | 272 ---------
 .../io/disk/iomanager/BlockChannelReader.java   |  87 +--
 .../io/disk/iomanager/BlockChannelWriter.java   |  89 +--
 .../BlockChannelWriterWithCallback.java         |  35 ++
 .../disk/iomanager/BulkBlockChannelReader.java  |  59 +-
 .../runtime/io/disk/iomanager/Channel.java      | 109 ----
 .../io/disk/iomanager/ChannelAccess.java        | 172 ------
 .../disk/iomanager/ChannelReaderInputView.java  |   3 +-
 .../disk/iomanager/ChannelWriterOutputView.java |   3 +-
 .../io/disk/iomanager/FileIOChannel.java        | 156 ++++++
 .../runtime/io/disk/iomanager/IOManager.java    | 547 ++-----------------
 .../io/disk/iomanager/IOManagerAsync.java       | 449 +++++++++++++++
 .../runtime/io/disk/iomanager/IORequest.java    |  18 +-
 .../io/disk/iomanager/QueuingCallback.java      |  47 ++
 .../io/disk/iomanager/RequestDoneCallback.java  |  36 ++
 .../runtime/io/disk/iomanager/RequestQueue.java |  22 +-
 .../iomanager/SynchronousFileIOChannel.java     |  45 ++
 .../iterative/io/SerializedUpdateBuffer.java    |   4 +-
 .../runtime/operators/hash/HashPartition.java   |   8 +-
 .../operators/hash/MutableHashTable.java        |   4 +-
 .../operators/hash/ReOpenableHashPartition.java |   6 +-
 .../hash/ReOpenableMutableHashTable.java        |   4 +-
 .../sort/CombiningUnilateralSortMerger.java     |  22 +-
 .../operators/sort/UnilateralSortMerger.java    |  57 +-
 .../flink/runtime/taskmanager/TaskManager.java  |   3 +-
 .../runtime/util/EnvironmentInformation.java    |   8 +
 .../apache/flink/runtime/blob/BlobKeyTest.java  |   2 +-
 .../flink/runtime/io/disk/ChannelViewsTest.java |  18 +-
 .../runtime/io/disk/SpillingBufferTest.java     |   4 +-
 .../io/disk/iomanager/IOManagerITCase.java      |   4 +-
 .../IOManagerPerformanceBenchmark.java          |  17 +-
 .../io/disk/iomanager/IOManagerTest.java        |  65 +--
 .../flink/runtime/operators/CrossTaskTest.java  |   7 +-
 .../operators/MatchTaskExternalITCase.java      |   5 +-
 .../flink/runtime/operators/MatchTaskTest.java  |   5 +-
 .../operators/hash/HashMatchIteratorITCase.java |   8 +-
 .../runtime/operators/hash/HashTableITCase.java |   9 +-
 .../hash/HashTablePerformanceComparison.java    |   3 +-
 .../hash/ReOpenableHashTableITCase.java         |   5 +-
 .../SpillingResettableIteratorTest.java         |   3 +-
 ...lingResettableMutableObjectIteratorTest.java |   3 +-
 .../CombiningUnilateralSortMergerITCase.java    |   3 +-
 .../operators/sort/ExternalSortITCase.java      |   3 +-
 .../sort/MassiveStringSortingITCase.java        |   5 +-
 .../sort/SortMergeMatchIteratorITCase.java      |  11 +-
 .../operators/testutils/DriverTestBase.java     |   3 +-
 .../operators/testutils/MockEnvironment.java    |   3 +-
 .../operators/util/HashVsSortMiniBenchmark.java |   5 +-
 55 files changed, 1776 insertions(+), 1449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
index 7eaf635..f38aa25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
@@ -27,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.util.MutableObjectIterator;
@@ -46,14 +46,14 @@ public class ChannelReaderInputViewIterator<E> implements MutableObjectIterator<
 	private final List<MemorySegment> freeMemTarget;
 	
 	
-	public ChannelReaderInputViewIterator(IOManager ioAccess, Channel.ID channel, List<MemorySegment> segments,
+	public ChannelReaderInputViewIterator(IOManager ioAccess, FileIOChannel.ID channel, List<MemorySegment> segments,
 			List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
 	throws IOException
 	{
 		this(ioAccess, channel, new LinkedBlockingQueue<MemorySegment>(), segments, freeMemTarget, accessors, numBlocks);
 	}
 		
-	public ChannelReaderInputViewIterator(IOManager ioAccess, Channel.ID channel,  LinkedBlockingQueue<MemorySegment> returnQueue,
+	public ChannelReaderInputViewIterator(IOManager ioAccess, FileIOChannel.ID channel,  LinkedBlockingQueue<MemorySegment> returnQueue,
 			List<MemorySegment> segments, List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
 	throws IOException
 	{

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
new file mode 100644
index 0000000..ecb794e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractFileIOChannel implements FileIOChannel {
+
+	/** Logger object for channel and its subclasses */
+	protected static final Logger LOG = LoggerFactory.getLogger(FileIOChannel.class);
+	
+	/** The ID of the underlying channel. */
+	protected final FileIOChannel.ID id;
+	
+	/** A file channel for NIO access to the file. */
+	protected final FileChannel fileChannel;
+	
+	
+	/**
+	 * Creates a new channel to the path indicated by the given ID. The channel hands IO requests to
+	 * the given request queue to be processed.
+	 * 
+	 * @param channelID The id describing the path of the file that the channel accessed.
+	 * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
+	 *                     than in read-only mode.
+	 * @throws IOException Thrown, if the channel could no be opened.
+	 */
+	protected AbstractFileIOChannel(FileIOChannel.ID channelID, boolean writeEnabled) throws IOException {
+		this.id = Preconditions.checkNotNull(channelID);
+		
+		try {
+			@SuppressWarnings("resource")
+			RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
+			this.fileChannel = file.getChannel();
+		}
+		catch (IOException e) {
+			throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets the channel ID of this channel.
+	 * 
+	 * @return This channel's ID.
+	 */
+	@Override
+	public final FileIOChannel.ID getChannelID() {
+		return this.id;
+	}
+	
+	@Override
+	public abstract boolean isClosed();
+	
+	@Override
+	public abstract void close() throws IOException;
+	
+	@Override
+	public void deleteChannel() {
+		if (!isClosed() || this.fileChannel.isOpen()) {
+			throw new IllegalStateException("Cannot delete a channel that is open.");
+		}
+	
+		// make a best effort to delete the file. Don't report exceptions.
+		try {
+			File f = new File(this.id.getPath());
+			if (f.exists()) {
+				f.delete();
+			}
+		} catch (Throwable t) {}
+	}
+	
+	@Override
+	public void closeAndDelete() throws IOException {
+		try {
+			close();
+		} finally {
+			deleteChannel();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
new file mode 100644
index 0000000..35273f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A reader that reads data in blocks from a file channel. The reader reads the blocks into a 
+ * {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
+ * request is not processed by the thread that issues it, but by an asynchronous reader thread. Once the read request
+ * is done, the asynchronous reader adds the full MemorySegment to a <i>return queue</i> where it can be popped by the
+ * worker thread, once it needs the data. The return queue is in this case a
+ * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
+ * if the request is still pending when the it requires the data. 
+ * <p>
+ * Typical pre-fetching reads are done by issuing the read requests early and popping the return queue once the data
+ * is actually needed.
+ * <p>
+ * The reader has no notion whether the size of the memory segments is actually the size of the blocks on disk,
+ * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
+ * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.  
+ */
+public class AsynchronousBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BlockChannelReader {
+	
+	private final LinkedBlockingQueue<MemorySegment> returnSegments;
+	
+	/**
+	 * Creates a new block channel reader for the given channel.
+	 *  
+	 * @param channelID The ID of the channel to read.
+	 * @param requestQueue The request queue of the asynchronous reader thread, to which the I/O requests
+	 *                     are added.
+	 * @param returnSegments The return queue, to which the full Memory Segments are added.
+	 * @throws IOException Thrown, if the underlying file channel could not be opened.
+	 */
+	protected AsynchronousBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue,
+			LinkedBlockingQueue<MemorySegment> returnSegments)
+	throws IOException
+	{
+		super(channelID, requestQueue, new QueuingCallback(returnSegments), false);
+		this.returnSegments = returnSegments;
+	}	
+
+	/**
+	 * Issues a read request, which will asynchronously fill the given segment with the next block in the
+	 * underlying file channel. Once the read request is fulfilled, the segment will be added to this reader's
+	 * return queue.
+	 *  
+	 * @param segment The segment to read the block into.
+	 * @throws IOException Thrown, when the reader encounters an I/O error. Due to the asynchronous nature of the
+	 *                     reader, the exception thrown here may have been caused by an earlier read request. 
+	 */
+	@Override
+	public void readBlock(MemorySegment segment) throws IOException {
+		// check the error state of this channel
+		checkErroneous();
+		
+		// write the current buffer and get the next one
+		// the statements have to be in this order to avoid incrementing the counter
+		// after the channel has been closed
+		this.requestsNotReturned.incrementAndGet();
+		if (this.closed || this.requestQueue.isClosed()) {
+			// if we found ourselves closed after the counter increment,
+			// decrement the counter again and do not forward the request
+			this.requestsNotReturned.decrementAndGet();
+			throw new IOException("The reader has been closed.");
+		}
+		this.requestQueue.add(new SegmentReadRequest(this, segment));
+	}
+	
+	/**
+	 * Gets the next memory segment that has been filled with data by the reader. This method blocks until
+	 * such a segment is available, or until an error occurs in the reader, or the reader is closed.
+	 * <p>
+	 * WARNING: If this method is invoked without any segment ever returning (for example, because the
+	 * {@link #readBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+	 * forever.
+	 * 
+	 * @return The next memory segment from the reader's return queue.
+	 * @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
+	 */
+	@Override
+	public MemorySegment getNextReturnedSegment() throws IOException {
+		try {
+			while (true) {
+				final MemorySegment next = this.returnSegments.poll(1000, TimeUnit.MILLISECONDS);
+				if (next != null) {
+					return next;
+				} else {
+					if (this.closed) {
+						throw new IOException("The reader has been asynchronously closed.");
+					}
+					checkErroneous();
+				}
+			}
+		} catch (InterruptedException iex) {
+			throw new IOException("Reader was interrupted while waiting for the next returning segment.");
+		}
+	}
+	
+	/**
+	 * Gets the queue in which the full memory segments are queued after the asynchronous read
+	 * is complete.
+	 * 
+	 * @return The queue with the full memory segments.
+	 */
+	@Override
+	public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+		return this.returnSegments;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
new file mode 100644
index 0000000..7e1681f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback implements BlockChannelWriter {
+	
+	private final LinkedBlockingQueue<MemorySegment> returnSegments;
+	
+	/**
+	 * Creates a new block channel writer for the given channel.
+	 *  
+	 * @param channelID The ID of the channel to write to.
+	 * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests
+	 *                     are added.
+	 * @param returnSegments The return queue, to which the processed Memory Segments are added.
+	 * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
+	 */
+	protected AsynchronousBlockWriter(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue,
+			LinkedBlockingQueue<MemorySegment> returnSegments)
+	throws IOException
+	{
+		super(channelID, requestQueue, new QueuingCallback(returnSegments));
+		this.returnSegments = returnSegments;
+	}
+	
+	/**
+	 * Gets the next memory segment that has been written and is available again.
+	 * This method blocks until such a segment is available, or until an error occurs in the writer, or the
+	 * writer is closed.
+	 * <p>
+	 * NOTE: If this method is invoked without any segment ever returning (for example, because the
+	 * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
+	 * forever.
+	 * 
+	 * @return The next memory segment from the writers's return queue.
+	 * @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
+	 */
+	@Override
+	public MemorySegment getNextReturnedSegment() throws IOException {
+		try {
+			while (true) {
+				final MemorySegment next = returnSegments.poll(1000, TimeUnit.MILLISECONDS);
+				if (next != null) {
+					return next;
+				} else {
+					if (this.closed) {
+						throw new IOException("The writer has been closed.");
+					}
+					checkErroneous();
+				}
+			}
+		} catch (InterruptedException e) {
+			throw new IOException("Writer was interrupted while waiting for the next returning segment.");
+		}
+	}
+	
+	/**
+	 * Gets the queue in which the memory segments are queued after the asynchronous write is completed.
+	 * 
+	 * @return The queue with the written memory segments.
+	 */
+	@Override
+	public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+		return this.returnSegments;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
new file mode 100644
index 0000000..6b6fb36
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * An asynchronous implementation of the {@link BlockChannelWriterWithCallback} that queues I/O requests
+ * and calls a callback once they have been handled.
+ */
+public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<WriteRequest> implements BlockChannelWriterWithCallback {
+	
+	/**
+	 * Creates a new asynchronous block writer for the given channel.
+	 *  
+	 * @param channelID The ID of the channel to write to.
+	 * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests are added.
+	 * @param callback The callback to be invoked when requests are done.
+	 * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
+	 */
+	protected AsynchronousBlockWriterWithCallback(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue,
+			RequestDoneCallback callback) throws IOException
+	{
+		super(channelID, requestQueue, callback, true);
+	}
+
+	/**
+	 * Issues a asynchronous write request to the writer.
+	 * 
+	 * @param segment The segment to be written.
+	 * @throws IOException Thrown, when the writer encounters an I/O error. Due to the asynchronous nature of the
+	 *                     writer, the exception thrown here may have been caused by an earlier write request. 
+	 */
+	@Override
+	public void writeBlock(MemorySegment segment) throws IOException {
+		// check the error state of this channel
+		checkErroneous();
+		
+		// write the current buffer and get the next one
+		this.requestsNotReturned.incrementAndGet();
+		if (this.closed || this.requestQueue.isClosed()) {
+			// if we found ourselves closed after the counter increment,
+			// decrement the counter again and do not forward the request
+			this.requestsNotReturned.decrementAndGet();
+			throw new IOException("The writer has been closed.");
+		}
+		this.requestQueue.add(new SegmentWriteRequest(this, segment));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
new file mode 100644
index 0000000..048f82f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ *
+ */
+public class AsynchronousBulkBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BulkBlockChannelReader {
+	
+	private final ArrayList<MemorySegment> returnBuffers;
+	
+	
+	protected AsynchronousBulkBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue, 
+			List<MemorySegment> sourceSegments, int numBlocks)
+	throws IOException
+	{
+		this (channelID, requestQueue, sourceSegments, numBlocks, new ArrayList<MemorySegment>(numBlocks));
+	}
+	
+	private AsynchronousBulkBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue, 
+			List<MemorySegment> sourceSegments, int numBlocks, ArrayList<MemorySegment> target)
+	throws IOException
+	{
+		super(channelID, requestQueue, new CollectingCallback(target), false);
+		this.returnBuffers = target;
+		
+		// sanity check
+		if (sourceSegments.size() < numBlocks) {
+			throw new IllegalArgumentException("The list of source memory segments must contain at least" +
+					" as many segments as the number of blocks to read.");
+		}
+		
+		// send read requests for all blocks
+		for (int i = 0; i < numBlocks; i++) {
+			readBlock(sourceSegments.remove(sourceSegments.size() - 1));
+		}
+	}
+	
+	private void readBlock(MemorySegment segment) throws IOException {
+		// check the error state of this channel
+		checkErroneous();
+		
+		// write the current buffer and get the next one
+		this.requestsNotReturned.incrementAndGet();
+		if (this.closed || this.requestQueue.isClosed()) {
+			// if we found ourselves closed after the counter increment,
+			// decrement the counter again and do not forward the request
+			this.requestsNotReturned.decrementAndGet();
+			throw new IOException("The reader has been closed.");
+		}
+		this.requestQueue.add(new SegmentReadRequest(this, segment));
+	}
+	
+	@Override
+	public List<MemorySegment> getFullSegments() {
+		synchronized (this.closeLock) {
+			if (!this.isClosed() || this.requestsNotReturned.get() > 0) {
+				throw new IllegalStateException("Full segments can only be obtained after the reader was properly closed.");
+			}
+		}
+		
+		return this.returnBuffers;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class CollectingCallback implements RequestDoneCallback {
+		
+		private final ArrayList<MemorySegment> list;
+
+		public CollectingCallback(ArrayList<MemorySegment> list) {
+			this.list = list;
+		}
+		
+		@Override
+		public void requestSuccessful(MemorySegment buffer) {
+			list.add(buffer);
+		}
+		
+		@Override
+		public void requestFailed(MemorySegment buffer, IOException e) {
+			list.add(buffer);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
new file mode 100644
index 0000000..098b334
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A base class for readers and writers that accept read or write requests for whole blocks.
+ * The request is delegated to an asynchronous I/O thread. After completion of the I/O request, the memory
+ * segment of the block is added to a collection to be returned.
+ * <p>
+ * The asynchrony of the access makes it possible to implement read-ahead or write-behind types of I/O accesses.
+ * 
+ * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to the I/O threads.
+ */
+public abstract class AsynchronousFileIOChannel<R extends IORequest> extends AbstractFileIOChannel {
+	
+	/** The lock that is used during closing to synchronize the thread that waits for all
+	 * requests to be handled with the asynchronous I/O thread. */
+	protected final Object closeLock = new Object();
+	
+	/** A request queue for submitting asynchronous requests to the corresponding IO worker thread. */
+	protected final RequestQueue<R> requestQueue;
+	
+	/** An atomic integer that counts the number of requests that we still wait for to return. */
+	protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
+	
+	/** Hander for completed requests */
+	protected final RequestDoneCallback resultHander;
+	
+	/** An exception that was encountered by the asynchronous request handling thread.*/
+	protected volatile IOException exception;
+	
+	/** Flag marking this channel as closed */
+	protected volatile boolean closed;
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
+	 * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers 
+	 * are returned by adding the to the given queue.
+	 * 
+	 * @param channelID The id describing the path of the file that the channel accessed.
+	 * @param requestQueue The queue that this channel hands its IO requests to.
+	 * @param callback The callback to be invoked when a request is done.
+	 * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
+	 *                     than in read-only mode.
+	 * @throws IOException Thrown, if the channel could no be opened.
+	 */
+	protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, RequestQueue<R> requestQueue, 
+			RequestDoneCallback callback, boolean writeEnabled) throws IOException
+	{
+		super(channelID, writeEnabled);
+		
+		if (requestQueue == null) {
+			throw new NullPointerException();
+		}
+		
+		this.requestQueue = requestQueue;
+		this.resultHander = callback;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isClosed() {
+		return this.closed;
+	}
+	
+	/**
+	 * Closes the reader and waits until all pending asynchronous requests are
+	 * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+	 * 
+	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
+	 *                     the closing was interrupted.
+	 */
+	public void close() throws IOException {
+		// atomically set the close flag
+		synchronized (this.closeLock) {
+			if (this.closed) {
+				return;
+			}
+			this.closed = true;
+			
+			try {
+				// wait until as many buffers have been returned as were written
+				// only then is everything guaranteed to be consistent.{
+				while (this.requestsNotReturned.get() > 0) {
+					try {
+						// we add a timeout here, because it is not guaranteed that the
+						// decrementing during buffer return and the check here are deadlock free.
+						// the deadlock situation is however unlikely and caught by the timeout
+						this.closeLock.wait(1000);
+						checkErroneous();
+					}
+					catch (InterruptedException iex) {}
+				}
+			}
+			finally {
+				// close the file
+				if (this.fileChannel.isOpen()) {
+					this.fileChannel.close();
+				}
+			}
+		}
+	}
+	
+	/**
+	 * This method waits for all pending asynchronous requests to return. When the
+	 * last request has returned, the channel is closed and deleted.
+	 * <p>
+	 * Even if an exception interrupts the closing, such that not all request are handled,
+	 * the underlying <tt>FileChannel</tt> is closed and deleted.
+	 * 
+	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if the closing was interrupted.
+	 */
+	public void closeAndDelete() throws IOException {
+		try {
+			close();
+		}
+		finally {
+			deleteChannel();
+		}
+	}
+	
+	/**
+	 * Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
+	 * be processed correctly.
+	 * 
+	 * @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
+	 *                     that defined the erroneous state as its cause.
+	 */
+	public final void checkErroneous() throws IOException {
+		if (this.exception != null) {
+			throw this.exception;
+		}
+	}
+	
+	/**
+	 * Handles a processed <tt>Buffer</tt>. This method is invoked by the
+	 * asynchronous IO worker threads upon completion of the IO request with the
+	 * provided buffer and/or an exception that occurred while processing the request
+	 * for that buffer.
+	 * 
+	 * @param buffer The buffer to be processed.
+	 * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
+	 */
+	final void handleProcessedBuffer(MemorySegment buffer, IOException ex) {
+		// even if the callbacks throw an error, we need to maintain our bookkeeping
+		try {
+			if (ex != null && this.exception == null) {
+				this.exception = ex;
+				this.resultHander.requestFailed(buffer, ex);
+			}
+			else {
+				this.resultHander.requestSuccessful(buffer);
+			}
+		}
+		finally {
+			// decrement the number of missing buffers. If we are currently closing, notify the 
+			if (this.closed) {
+				synchronized (this.closeLock) {
+					int num = this.requestsNotReturned.decrementAndGet();
+					if (num == 0) {
+						this.closeLock.notifyAll();
+					}
+				}
+			}
+			else {
+				this.requestsNotReturned.decrementAndGet();
+			}
+		}
+	}
+}
+
+//--------------------------------------------------------------------------------------------
+
+/**
+ * Read request that reads an entire memory segment from a block reader.
+ */
+final class SegmentReadRequest implements ReadRequest {
+	
+	private final AsynchronousFileIOChannel<ReadRequest> channel;
+	
+	private final MemorySegment segment;
+	
+	protected SegmentReadRequest(AsynchronousFileIOChannel<ReadRequest> targetChannel, MemorySegment segment) {
+		this.channel = targetChannel;
+		this.segment = segment;
+	}
+
+	@Override
+	public void read() throws IOException {
+		final FileChannel c = this.channel.fileChannel;
+		if (c.size() - c.position() > 0) {
+			try {
+				final ByteBuffer wrapper = this.segment.wrap(0, this.segment.size());
+				this.channel.fileChannel.read(wrapper);
+			}
+			catch (NullPointerException npex) {
+				throw new IOException("Memory segment has been released.");
+			}
+		}
+	}
+
+	@Override
+	public void requestDone(IOException ioex) {
+		this.channel.handleProcessedBuffer(this.segment, ioex);
+	}
+}
+
+//--------------------------------------------------------------------------------------------
+
+/**
+ * Write request that writes an entire memory segment to the block writer.
+ */
+final class SegmentWriteRequest implements WriteRequest {
+	
+	private final AsynchronousFileIOChannel<WriteRequest> channel;
+	
+	private final MemorySegment segment;
+	
+	protected SegmentWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) {
+		this.channel = targetChannel;
+		this.segment = segment;
+	}
+
+	@Override
+	public void write() throws IOException {
+		try {
+			this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size()));
+		}
+		catch (NullPointerException npex) {
+			throw new IOException("Memory segment has been released.");
+		}
+	}
+
+	@Override
+	public void requestDone(IOException ioex) {
+		this.channel.handleProcessedBuffer(this.segment, ioex);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
deleted file mode 100644
index f19586d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-
-/**
- * A base class for readers and writers that accept read or write requests for whole blocks.
- * The request is delegated to an asynchronous I/O thread. After completion of the I/O request, the memory
- * segment of the block is added to a collection to be returned.
- * <p>
- * The asynchrony of the access makes it possible to implement read-ahead or write-behind types of I/O accesses.
- * 
- * 
- * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to
- *            the I/O threads.
- * @param <C> The type of collection used to collect the segments from completed requests. Those segments are for
- *            example for write requests the written and reusable segments, and for read requests the now full
- *            and usable segments. The collection type may for example be a synchronized queue or an unsynchronized
- *            list. 
- */
-public abstract class BlockChannelAccess<R extends IORequest, C extends Collection<MemorySegment>> extends ChannelAccess<MemorySegment, R>
-{	
-	/**
-	 * The lock that is used during closing to synchronize the thread that waits for all
-	 * requests to be handled with the asynchronous I/O thread.
-	 */
-	protected final Object closeLock = new Object();
-	
-	/**
-	 * An atomic integer that counts the number of buffers we still wait for to return.
-	 */
-	protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
-	
-	/**
-	 * The collection gathering the processed buffers that are ready to be (re)used.
-	 */
-	protected final C returnBuffers;
-	
-	/**
-	 * Flag marking this channel as closed;
-	 */
-	protected volatile boolean closed;
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
-	 * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers 
-	 * are returned by adding the to the given queue.
-	 * 
-	 * @param channelID The id describing the path of the file that the channel accessed.
-	 * @param requestQueue The queue that this channel hands its IO requests to.
-	 * @param returnQueue The queue to which the segments are added after their buffer was written.
-	 * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
-	 *                     than in read-only mode.
-	 * @throws IOException Thrown, if the channel could no be opened.
-	 */
-	protected BlockChannelAccess(Channel.ID channelID, RequestQueue<R> requestQueue,
-			C returnQueue, boolean writeEnabled)
-	throws IOException
-	{
-		super(channelID, requestQueue, writeEnabled);
-		
-		if (requestQueue == null) {
-			throw new NullPointerException();
-		}
-		
-		this.returnBuffers = returnQueue;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the queue (or list) to which the asynchronous reader adds its elements.
-	 * 
-	 * @return The queue (or list) to which the asynchronous reader adds its elements.
-	 */
-	public C getReturnQueue()
-	{
-		return this.returnBuffers;
-	}
-	
-
-	@Override
-	public boolean isClosed()
-	{
-		return this.closed;
-	}
-	
-	/**
-	 * Closes the reader and waits until all pending asynchronous requests are
-	 * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is
-	 * closed.
-	 * 
-	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
-	 *                     the closing was interrupted.
-	 */
-	public void close() throws IOException
-	{
-		// atomically set the close flag
-		synchronized (this.closeLock) {
-			if (this.closed) {
-				return;
-			}
-			this.closed = true;
-			
-			try {
-				// wait until as many buffers have been returned as were written
-				// only then is everything guaranteed to be consistent.{
-				while (this.requestsNotReturned.get() > 0) {
-					try {
-						// we add a timeout here, because it is not guaranteed that the
-						// decrementing during buffer return and the check here are deadlock free.
-						// the deadlock situation is however unlikely and caught by the timeout
-						this.closeLock.wait(1000);
-						checkErroneous();
-					}
-					catch (InterruptedException iex) {}
-				}
-			}
-			finally {
-				// close the file
-				if (this.fileChannel.isOpen()) {
-					this.fileChannel.close();
-				}
-			}
-		}
-	}
-	
-	/**
-	 * This method waits for all pending asynchronous requests to return. When the
-	 * last request has returned, the channel is closed and deleted.
-	 * 
-	 * Even if an exception interrupts the closing, such that not all request are handled,
-	 * the underlying <tt>FileChannel</tt> is closed and deleted.
-	 * 
-	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
-	 *                     the closing was interrupted.
-	 */
-	public void closeAndDelete() throws IOException
-	{
-		try {
-			close();
-		}
-		finally {
-			deleteChannel();
-		}
-	}
-	
-
-	@Override
-	protected void returnBuffer(MemorySegment buffer)
-	{
-		this.returnBuffers.add(buffer);
-		
-		// decrement the number of missing buffers. If we are currently closing, notify the 
-		if (this.closed) {
-			synchronized (this.closeLock) {
-				int num = this.requestsNotReturned.decrementAndGet();
-				if (num == 0) {
-					this.closeLock.notifyAll();
-				}
-			}
-		}
-		else {
-			this.requestsNotReturned.decrementAndGet();
-		}
-	}
-}
-
-//--------------------------------------------------------------------------------------------
-
-/**
- * Special read request that reads an entire memory segment from a block reader.
- */
-final class SegmentReadRequest implements ReadRequest
-{
-	private final BlockChannelAccess<ReadRequest, ?> channel;
-	
-	private final MemorySegment segment;
-	
-	protected SegmentReadRequest(BlockChannelAccess<ReadRequest, ?> targetChannel, MemorySegment segment)
-	{
-		this.channel = targetChannel;
-		this.segment = segment;
-	}
-
-
-	@Override
-	public void read() throws IOException
-	{
-		final FileChannel c = this.channel.fileChannel;
-		if (c.size() - c.position() > 0) {
-			try {
-				final ByteBuffer wrapper = this.segment.wrap(0, this.segment.size());
-				this.channel.fileChannel.read(wrapper);
-			} catch (NullPointerException npex) {
-				// the memory has been cleared asynchronouosly through task failing or canceling
-				// ignore the request, since the result cannot be read
-			}
-		}
-	}
-
-
-	@Override
-	public void requestDone(IOException ioex)
-	{
-		this.channel.handleProcessedBuffer(this.segment, ioex);
-	}
-}
-
-//--------------------------------------------------------------------------------------------
-
-/**
- * Special write request that writes an entire memory segment to the block writer.
- */
-final class SegmentWriteRequest implements WriteRequest
-{
-	private final BlockChannelAccess<WriteRequest, ?> channel;
-	
-	private final MemorySegment segment;
-	
-	protected SegmentWriteRequest(BlockChannelAccess<WriteRequest, ?> targetChannel, MemorySegment segment)
-	{
-		this.channel = targetChannel;
-		this.segment = segment;
-	}
-
-
-	@Override
-	public void write() throws IOException
-	{
-		try {
-			this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size()));
-		} catch (NullPointerException npex) {
-			// the memory has been cleared asynchronouosly through task failing or canceling
-			// ignore the request, since there is nothing to write.
-		}
-	}
-
-
-	@Override
-	public void requestDone(IOException ioex)
-	{
-		this.channel.handleProcessedBuffer(this.segment, ioex);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
index f674ad4..f25827a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
@@ -16,76 +16,30 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.core.memory.MemorySegment;
 
-
 /**
  * A reader that reads data in blocks from a file channel. The reader reads the blocks into a 
- * {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
- * request is not processed by the thread that issues it, but by an asynchronous reader thread. Once the read request
- * is done, the asynchronous reader adds the full MemorySegment to a <i>return queue</i> where it can be popped by the
- * worker thread, once it needs the data. The return queue is in this case a
- * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
- * if the request is still pending when the it requires the data. 
- * <p>
- * Typical pre-fetching reads are done by issuing the read requests early and popping the return queue once the data
- * is actually needed.
- * <p>
- * The reader has no notion whether the size of the memory segments is actually the size of the blocks on disk,
- * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
- * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.  
+ * {@link org.apache.flink.core.memory.MemorySegment}. To support asynchronous implementations,
+ * the read method does not immediately return the full memory segment, but rather adds it to
+ * a blocking queue of finished read operations.
  */
-public class BlockChannelReader extends BlockChannelAccess<ReadRequest, LinkedBlockingQueue<MemorySegment>>
-{
-	/**
-	 * Creates a new block channel reader for the given channel.
-	 *  
-	 * @param channelID The ID of the channel to read.
-	 * @param requestQueue The request queue of the asynchronous reader thread, to which the I/O requests
-	 *                     are added.
-	 * @param returnSegments The return queue, to which the full Memory Segments are added.
-	 * @throws IOException Thrown, if the underlying file channel could not be opened.
-	 */
-	protected BlockChannelReader(Channel.ID channelID, RequestQueue<ReadRequest> requestQueue,
-			LinkedBlockingQueue<MemorySegment> returnSegments, int numRequestsToBundle)
-	throws IOException
-	{
-		super(channelID, requestQueue, returnSegments, false);
-	}	
+public interface BlockChannelReader extends FileIOChannel {
 
 	/**
-	 * Issues a read request, which will asynchronously fill the given segment with the next block in the
+	 * Issues a read request, which will fill the given segment with the next block in the
 	 * underlying file channel. Once the read request is fulfilled, the segment will be added to this reader's
 	 * return queue.
 	 *  
 	 * @param segment The segment to read the block into.
-	 * @throws IOException Thrown, when the reader encounters an I/O error. Due to the asynchronous nature of the
-	 *                     reader, the exception thrown here may have been caused by an earlier read request. 
+	 * @throws IOException Thrown, when the reader encounters an I/O error.
 	 */
-	public void readBlock(MemorySegment segment) throws IOException
-	{
-		// check the error state of this channel
-		checkErroneous();
-		
-		// write the current buffer and get the next one
-		// the statements have to be in this order to avoid incrementing the counter
-		// after the channel has been closed
-		this.requestsNotReturned.incrementAndGet();
-		if (this.closed || this.requestQueue.isClosed()) {
-			// if we found ourselves closed after the counter increment,
-			// decrement the counter again and do not forward the request
-			this.requestsNotReturned.decrementAndGet();
-			throw new IOException("The reader has been closed.");
-		}
-		this.requestQueue.add(new SegmentReadRequest(this, segment));
-	}
+	void readBlock(MemorySegment segment) throws IOException;
 	
 	/**
 	 * Gets the next memory segment that has been filled with data by the reader. This method blocks until
@@ -98,22 +52,13 @@ public class BlockChannelReader extends BlockChannelAccess<ReadRequest, LinkedBl
 	 * @return The next memory segment from the reader's return queue.
 	 * @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
 	 */
-	public MemorySegment getNextReturnedSegment() throws IOException
-	{
-		try {
-			while (true) {
-				final MemorySegment next = this.returnBuffers.poll(2000, TimeUnit.MILLISECONDS);
-				if (next != null) {
-					return next;
-				} else {
-					if (this.closed) {
-						throw new IOException("The reader has been asynchronously closed.");
-					}
-					checkErroneous();
-				}
-			}
-		} catch (InterruptedException iex) {
-			throw new IOException("Reader was interrupted while waiting for the next returning segment.");
-		}
-	}
+	public MemorySegment getNextReturnedSegment() throws IOException;
+	
+	/**
+	 * Gets the queue in which the full memory segments are queued after the read is complete.
+	 * 
+	 * @return The queue with the full memory segments.
+	 */
+	LinkedBlockingQueue<MemorySegment> getReturnQueue();
 }
+	
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
index 44a2edb..25c74e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
@@ -16,101 +16,40 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
-
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.core.memory.MemorySegment;
 
-
 /**
  * A writer that writes data in blocks to a file channel. The writer receives the data blocks in the form of 
  * {@link org.apache.flink.core.memory.MemorySegment}, which it writes entirely to the channel,
- * regardless of how space in the segment is used. The writing happens in an asynchronous fashion. That is, a write
- * request is not processed by the thread that issues it, but by an asynchronous writer thread. Once the request
- * is done, the asynchronous writer adds the MemorySegment to a <i>return queue</i> where it can be popped by the
- * worker thread, to be reused. The return queue is in this case a
- * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
- * if the request is still pending when the it requires the segment back. 
- * <p>
- * Typical write behind is realized, by having a small set of segments in the return queue at all times. When a
- * memory segment must be written, the request is issued to the writer and a new segment is immediately popped from
- * the return queue. Once too many requests have been issued and the I/O thread cannot keep up, the working thread
- * naturally blocks until another segment is available again.
+ * regardless of how space in the segment is used. The writing may be realized synchronously, or asynchronously,
+ * depending on the implementation.
  */
-public class BlockChannelWriter extends BlockChannelAccess<WriteRequest, LinkedBlockingQueue<MemorySegment>>
-{
-	/**
-	 * Creates a new block channel writer for the given channel.
-	 *  
-	 * @param channelID The ID of the channel to write to.
-	 * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests
-	 *                     are added.
-	 * @param returnSegments The return queue, to which the processed Memory Segments are added.
-	 * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
-	 */
-	protected BlockChannelWriter(Channel.ID channelID, RequestQueue<WriteRequest> requestQueue,
-			LinkedBlockingQueue<MemorySegment> returnSegments, int numRequestsToBundle)
-	throws IOException
-	{
-		super(channelID, requestQueue, returnSegments, true);
-	}
-
-	/**
-	 * Issues a asynchronous write request to the writer.
-	 * 
-	 * @param segment The segment to be written.
-	 * @throws IOException Thrown, when the writer encounters an I/O error. Due to the asynchronous nature of the
-	 *                     writer, the exception thrown here may have been caused by an earlier write request. 
-	 */
-	public void writeBlock(MemorySegment segment) throws IOException
-	{
-		// check the error state of this channel
-		checkErroneous();
-		
-		// write the current buffer and get the next one
-		this.requestsNotReturned.incrementAndGet();
-		if (this.closed || this.requestQueue.isClosed()) {
-			// if we found ourselves closed after the counter increment,
-			// decrement the counter again and do not forward the request
-			this.requestsNotReturned.decrementAndGet();
-			throw new IOException("The writer has been closed.");
-		}
-		this.requestQueue.add(new SegmentWriteRequest(this, segment));
-	}
+public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
 	
 	/**
 	 * Gets the next memory segment that has been written and is available again.
 	 * This method blocks until such a segment is available, or until an error occurs in the writer, or the
 	 * writer is closed.
 	 * <p>
-	 * WARNING: If this method is invoked without any segment ever returning (for example, because the
-	 * {@link #writeBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+	 * NOTE: If this method is invoked without any segment ever returning (for example, because the
+	 * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
 	 * forever.
 	 * 
 	 * @return The next memory segment from the writers's return queue.
 	 * @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
 	 */
-	public MemorySegment getNextReturnedSegment() throws IOException
-	{
-		try {
-			while (true) {
-				final MemorySegment next = this.returnBuffers.poll(2000, TimeUnit.MILLISECONDS);
-				if (next != null) {
-					return next;
-				} else {
-					if (this.closed) {
-						throw new IOException("The writer has been closed.");
-					}
-					checkErroneous();
-				}
-			}
-		} catch (InterruptedException iex) {
-			throw new IOException("Writer was interrupted while waiting for the next returning segment.");
-		}
-	}
+	MemorySegment getNextReturnedSegment() throws IOException;
+	
+	/**
+	 * Gets the queue in which the memory segments are queued after the asynchronous write
+	 * is completed
+	 * 
+	 * @return The queue with the written memory segments.
+	 */
+	LinkedBlockingQueue<MemorySegment> getReturnQueue();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
new file mode 100644
index 0000000..57bc7e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+public interface BlockChannelWriterWithCallback extends FileIOChannel {
+	
+	/**
+	 * Writes the given memory segment. The request may be executed synchronously, or asynchronously, depending
+	 * on the implementation.
+	 * 
+	 * @param segment The segment to be written.
+	 * @throws IOException Thrown, when the writer encounters an I/O error.
+	 */
+	void writeBlock(MemorySegment segment) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
index 3be85d1..84883e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
@@ -16,71 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.core.memory.MemorySegment;
 
-
 /**
  *
- *
  */
-public class BulkBlockChannelReader extends BlockChannelAccess<ReadRequest, ArrayList<MemorySegment>>
-{
+public interface BulkBlockChannelReader extends FileIOChannel {
 	
-	
-	protected BulkBlockChannelReader(Channel.ID channelID, RequestQueue<ReadRequest> requestQueue, 
-			List<MemorySegment> sourceSegments, int numBlocks)
-	throws IOException
-	{
-		super(channelID, requestQueue, new ArrayList<MemorySegment>(numBlocks), false);
-		
-		// sanity check
-		if (sourceSegments.size() < numBlocks) {
-			throw new IllegalArgumentException("The list of source memory segments must contain at least" +
-					" as many segments as the number of blocks to read.");
-		}
-		
-		// send read requests for all blocks
-		for (int i = 0; i < numBlocks; i++) {
-			readBlock(sourceSegments.remove(sourceSegments.size() - 1));
-		}
-	}
-	
-
-	
-	private void readBlock(MemorySegment segment) throws IOException
-	{
-		// check the error state of this channel
-		checkErroneous();
-		
-		// write the current buffer and get the next one
-		this.requestsNotReturned.incrementAndGet();
-		if (this.closed || this.requestQueue.isClosed()) {
-			// if we found ourselves closed after the counter increment,
-			// decrement the counter again and do not forward the request
-			this.requestsNotReturned.decrementAndGet();
-			throw new IOException("The reader has been closed.");
-		}
-		this.requestQueue.add(new SegmentReadRequest(this, segment));
-	}
-	
-	public List<MemorySegment> getFullSegments()
-	{
-		synchronized (this.closeLock) {
-			if (!this.isClosed() || this.requestsNotReturned.get() > 0) {
-				throw new IllegalStateException("Full segments can only be obtained after the reader was properly closed.");
-			}
-		}
-		
-		return this.returnBuffers;
-	}
-
+	List<MemorySegment> getFullSegments();
 }
-
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
deleted file mode 100644
index 7e64e79..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.File;
-import java.util.Random;
-
-import org.apache.flink.util.StringUtils;
-
-/**
- * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
- * files that contain sorted runs of data from the same stream, that will later on be merged together.
- * 
- */
-public final class Channel
-{
-	private static final int RANDOM_BYTES_LENGTH = 16;
-
-	/**
-	 * An ID identifying an underlying fileChannel.
-	 * 
-	 */
-	public static class ID
-	{
-		private final String path;
-		
-		private final int threadNum;
-
-		protected ID(final String path, final int threadNum) {
-			this.path = path;
-			this.threadNum = threadNum;
-		}
-
-		protected ID(final String basePath, final int threadNum, final Random random)
-		{
-			this.path = basePath + File.separator + randomString(random) + ".channel";
-			this.threadNum = threadNum;
-		}
-
-		/**
-		 * Returns the path to the underlying temporary file.
-		 */
-		public String getPath() {
-			return path;
-		}
-		
-		int getThreadNum() {
-			return this.threadNum;
-		}
-
-		public String toString() {
-			return path;
-		}
-	}
-
-	public static final class Enumerator
-	{
-		private static final String FORMAT = "%s%s%s.%06d.channel";
-
-		private final String[] paths;
-		
-		private final String namePrefix;
-
-		private int counter;
-
-		protected Enumerator(final String[] basePaths, final Random random)
-		{
-			this.paths = basePaths;
-			this.namePrefix = randomString(random);
-			this.counter = 0;
-		}
-
-		public ID next()
-		{
-			final int threadNum = counter % paths.length;
-			return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
-		}
-	}
-
-	/**
-	 * Creates a random byte sequence using the provided {@code random} generator and returns its hex representation.
-	 * 
-	 * @param random
-	 *        The random number generator to be used.
-	 * @return A hex representation of the generated byte sequence
-	 */
-	private static final String randomString(final Random random) {
-		final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
-		random.nextBytes(bytes);
-		return StringUtils.byteToHexString(bytes);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
deleted file mode 100644
index 2b5b34d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-
-
-/**
- * A base class for readers and writers that read data from I/O manager channels, or write data to them.
- * Requests handled by channels that inherit from this class are executed asynchronously, which allows
- * write-behind for writers and pre-fetching for readers.
- * 
- * 
- * @param <T> The buffer type used for the underlying IO operations.
- */
-public abstract class ChannelAccess<T, R extends IORequest>
-{
-	/**
-	 * The ID of the underlying channel.
-	 */
-	protected final Channel.ID id;
-
-	/**
-	 * A file channel for NIO access to the file.
-	 */
-	protected final FileChannel fileChannel;
-	
-	/**
-	 * A request queue for submitting asynchronous requests to the corresponding
-	 * IO worker thread.
-	 */
-	protected final RequestQueue<R> requestQueue;
-	
-	/**
-	 * An exception that was encountered by the asynchronous request handling thread.
-	 */
-	protected volatile IOException exception;
-	
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new channel to the path indicated by the given ID. The channel hands IO requests to
-	 * the given request queue to be processed.
-	 * 
-	 * @param channelID The id describing the path of the file that the channel accessed.
-	 * @param requestQueue The queue that this channel hands its IO requests to.
-	 * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
-	 *                     than in read-only mode.
-	 * @throws IOException Thrown, if the channel could no be opened.
-	 */
-	protected ChannelAccess(Channel.ID channelID, RequestQueue<R> requestQueue, boolean writeEnabled)
-	throws IOException
-	{
-		if (channelID == null || requestQueue == null) {
-			throw new NullPointerException();
-		}
-		
-		this.id = channelID;
-		this.requestQueue = requestQueue;
-		
-		try {
-			@SuppressWarnings("resource")
-			RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
-			this.fileChannel = file.getChannel();
-		}
-		catch (IOException e) {
-			throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Checks, whether this channel has been closed;
-	 * 
-	 * @return True, if the channel has been closed, false otherwise.
-	 */
-	public abstract boolean isClosed();
-	
-	/**
-	 * This method is invoked by the asynchronous I/O thread to return a buffer after the I/O request
-	 * completed.
-	 * 
-	 * @param buffer The buffer to be returned.
-	 */
-	protected abstract void returnBuffer(T buffer); 
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the channel ID of this channel.
-	 * 
-	 * @return This channel's ID.
-	 */
-	public final Channel.ID getChannelID()
-	{
-		return this.id;
-	}
-	
-	/**
-	 * Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
-	 * be processed correctly.
-	 * 
-	 * @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
-	 *                     that defined the erroneous state as its cause.
-	 */
-	public final void checkErroneous() throws IOException
-	{
-		if (this.exception != null) {
-			throw new IOException("The channel is erroneous.", this.exception);
-		}
-	}
-	
-	/**
-	 * Deletes this channel by physically removing the file beneath it.
-	 * This method may only be called on a closed channel.
-	 */
-	public void deleteChannel()
-	{
-		if (this.fileChannel.isOpen()) {
-			throw new IllegalStateException("Cannot delete a channel that is open.");
-		}
-	
-		// make a best effort to delete the file. Don't report exceptions.
-		try {
-			File f = new File(this.id.getPath());
-			if (f.exists()) {
-				f.delete();
-			}
-		} catch (Throwable t) {}
-	}
-	
-	/**
-	 * Handles a processed <tt>Buffer</tt>. This method is invoked by the
-	 * asynchronous IO worker threads upon completion of the IO request with the
-	 * provided buffer and/or an exception that occurred while processing the request
-	 * for that buffer.
-	 * 
-	 * @param buffer The buffer to be processed.
-	 * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
-	 */
-	final void handleProcessedBuffer(T buffer, IOException ex) {
-		
-		if (ex != null && this.exception == null) {
-			this.exception = ex;
-		}
-		
-		returnBuffer(buffer);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index 25aa289..d85ec82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -164,8 +164,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
 	 * @return A list containing all memory segments originally supplied to this view.
 	 * @throws IOException Thrown, if the underlying reader could not be properly closed.
 	 */
-	public List<MemorySegment> close() throws IOException
-	{	
+	public List<MemorySegment> close() throws IOException {	
 		if (this.closed) {
 			throw new IllegalStateException("Already closed.");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index f230333..9824d34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -81,8 +81,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
 	 * @param memory The memory used to buffer data, or null, to utilize solely the return queue.
 	 * @param segmentSize The size of the memory segments.
 	 */
-	public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize)
-	{
+	public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize) {
 		super(segmentSize, HEADER_LENGTH);
 		
 		if (writer == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
new file mode 100644
index 0000000..7c9d31b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.util.StringUtils;
+
+/**
+ * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
+ * files that contain sorted runs of data from the same stream, that will later on be merged together.
+ */
+public interface FileIOChannel {
+	
+	/**
+	 * Gets the channel ID of this I/O channel.
+	 * 
+	 * @return The channel ID.
+	 */
+	FileIOChannel.ID getChannelID();
+	
+	/**
+	 * Checks whether the channel has been closed.
+	 * 
+	 * @return True if the channel has been closed, false otherwise.
+	 */
+	boolean isClosed();
+
+	/**
+	* Closes the channel. For asynchronous implementations, this method waits until all pending requests are
+	* handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+	* 
+	* @throws IOException Thrown, if an error occurred while waiting for pending requests.
+	*/
+	void close() throws IOException;
+
+	/**
+	 * Deletes the file underlying this I/O channel.
+	 *  
+	 * @throws IllegalStateException Thrown, when the channel is still open.
+	 */
+	void deleteChannel();
+	
+	/**
+	* Closes the channel and deletes the underlying file.
+	* For asynchronous implementations, this method waits until all pending requests are handled;
+	* 
+	* @throws IOException Thrown, if an error occurred while waiting for pending requests.
+	*/
+	public void closeAndDelete() throws IOException;
+	
+	// --------------------------------------------------------------------------------------------
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * An ID identifying an underlying file channel.
+	 */
+	public static class ID {
+		
+		private static final int RANDOM_BYTES_LENGTH = 16;
+		
+		private final String path;
+		
+		private final int threadNum;
+
+		protected ID(String path, int threadNum) {
+			this.path = path;
+			this.threadNum = threadNum;
+		}
+
+		protected ID(String basePath, int threadNum, Random random) {
+			this.path = basePath + File.separator + randomString(random) + ".channel";
+			this.threadNum = threadNum;
+		}
+
+		/**
+		 * Returns the path to the underlying temporary file.
+		 */
+		public String getPath() {
+			return path;
+		}
+		
+		int getThreadNum() {
+			return this.threadNum;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof ID) {
+				ID other = (ID) obj;
+				return this.path.equals(other.path) && this.threadNum == other.threadNum;
+			} else {
+				return false;
+			}
+		}
+		
+		@Override
+		public int hashCode() {
+			return path.hashCode();
+		}
+		
+		@Override
+		public String toString() {
+			return path;
+		}
+		
+		private static final String randomString(final Random random) {
+			final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
+			random.nextBytes(bytes);
+			return StringUtils.byteToHexString(bytes);
+		}
+	}
+
+	/**
+	 * An enumerator for channels that logically belong together.
+	 */
+	public static final class Enumerator {
+		
+		private static final String FORMAT = "%s%s%s.%06d.channel";
+
+		private final String[] paths;
+		
+		private final String namePrefix;
+
+		private int counter;
+
+		protected Enumerator(String[] basePaths, Random random) {
+			this.paths = basePaths;
+			this.namePrefix = ID.randomString(random);
+			this.counter = 0;
+		}
+
+		public ID next() {
+			final int threadNum = counter % paths.length;
+			return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
+		}
+	}
+}