You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/18 17:48:55 UTC

[06/13] flink git commit: [FLINK-1350] [runtime] Add blocking result partition variant

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index e69ef17..c0be611 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -300,7 +300,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	/**
 	 * The reader for the spilled-file of the probe partition that is currently read.
 	 */
-	private BlockChannelReader currentSpilledProbeSide;
+	private BlockChannelReader<MemorySegment> currentSpilledProbeSide;
 	
 	/**
 	 * The channel enumerator that is used while processing the current partition to create
@@ -802,7 +802,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 			segments.add(getNextBuffer());
 			segments.add(getNextBuffer());
 			
-			final BlockChannelReader inReader = this.ioManager.createBlockChannelReader(p.getBuildSideChannel().getChannelID());
+			final BlockChannelReader<MemorySegment> inReader = this.ioManager.createBlockChannelReader(p.getBuildSideChannel().getChannelID());
 			final ChannelReaderInputView inView = new HeaderlessChannelReaderInputView(inReader, segments,
 						p.getBuildSideBlockCount(), p.getLastSegmentLimit(), false);
 			final ChannelReaderInputViewIterator<BT> inIter = new ChannelReaderInputViewIterator<BT>(inView, 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
index 56dcfae..84868ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
@@ -37,7 +37,7 @@ public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> {
 
 	private FileIOChannel.ID initialBuildSideChannel = null;			// path to initial build side contents (only for in-memory partitions)
 	
-	private BlockChannelWriter initialBuildSideWriter = null;
+	private BlockChannelWriter<MemorySegment> initialBuildSideWriter = null;
 
 	private boolean isRestored = false;							// marks a restored partition
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 3466024..d05bd9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -301,7 +301,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				}
 
 				// create writer
-				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+				final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 				registerOpenChannelToBeRemovedAtShudown(writer);
 				final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
 																			this.memManager.getPageSize());
@@ -457,7 +457,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 			// create a new channel writer
 			final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
 			registerChannelToBeRemovedAtShudown(mergedChannelID);
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
+			final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
 			registerOpenChannelToBeRemovedAtShudown(writer);
 			final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, 
 																			this.memManager.getPageSize());

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index a938e98..388b7b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -1315,7 +1315,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				registerChannelToBeRemovedAtShudown(channel);
 
 				// create writer
-				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+				final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 				registerOpenChannelToBeRemovedAtShudown(writer);
 				final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
 																			this.memManager.getPageSize());
@@ -1493,7 +1493,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				final List<MemorySegment> segsForChannel = inputSegments.get(i);
 				
 				// create a reader. if there are multiple segments for the reader, issue multiple together per I/O request
-				final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel.getChannel());
+				final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel.getChannel());
 					
 				readerList.add(reader);
 				registerOpenChannelToBeRemovedAtShudown(reader);
@@ -1578,7 +1578,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			// create a new channel writer
 			final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
 			registerChannelToBeRemovedAtShudown(mergedChannelID);
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
+			final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
 			registerOpenChannelToBeRemovedAtShudown(writer);
 			final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, 
 																			this.memManager.getPageSize());

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e3e4175..8cebc6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -23,14 +23,15 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask;
 import org.apache.flink.runtime.profiling.TaskManagerProfiler;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
@@ -73,6 +74,8 @@ public class Task {
 	/** The current execution state of the task */
 	private volatile ExecutionState executionState = ExecutionState.DEPLOYING;
 
+	private volatile Throwable failureCause;
+
 	// --------------------------------------------------------------------------------------------	
 
 	public Task(JobID jobId, JobVertexID vertexId, int taskIndex, int parallelism,
@@ -160,6 +163,10 @@ public class Task {
 		}
 	}
 
+	public Throwable getFailureCause() {
+		return failureCause;
+	}
+
 	// ----------------------------------------------------------------------------------------------------------------
 	//  States and Transitions
 	// ----------------------------------------------------------------------------------------------------------------
@@ -174,7 +181,7 @@ public class Task {
 	public boolean markAsFinished() {
 		if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
 			notifyObservers(ExecutionState.FINISHED, null);
-			notifyExecutionStateChange(ExecutionState.FINISHED, null);
+			unregisterTask();
 			return true;
 		}
 		else {
@@ -195,8 +202,11 @@ public class Task {
 			// after all, we may have recognized our failure state before the cancelling and never sent a canceled
 			// message back
 			else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+				this.failureCause = error;
+
 				notifyObservers(ExecutionState.FAILED, ExceptionUtils.stringifyException(error));
-				notifyExecutionStateChange(ExecutionState.FAILED, error);
+				unregisterTask();
+
 				return;
 			}
 		}
@@ -218,7 +228,7 @@ public class Task {
 				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
 
 					notifyObservers(ExecutionState.CANCELED, null);
-					notifyExecutionStateChange(ExecutionState.CANCELED, null);
+					unregisterTask();
 					return;
 				}
 			}
@@ -268,9 +278,10 @@ public class Task {
 			if (current == ExecutionState.DEPLOYING) {
 				// directly set to canceled
 				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+					this.failureCause = cause;
 
 					notifyObservers(ExecutionState.FAILED, null);
-					notifyExecutionStateChange(ExecutionState.FAILED, cause);
+					unregisterTask();
 					return;
 				}
 			}
@@ -284,8 +295,10 @@ public class Task {
 						LOG.error("Error while cancelling the task.", e);
 					}
 
+					this.failureCause = cause;
+
 					notifyObservers(ExecutionState.FAILED, null);
-					notifyExecutionStateChange(ExecutionState.FAILED, cause);
+					unregisterTask();
 
 					return;
 				}
@@ -309,7 +322,7 @@ public class Task {
 
 			if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
 				notifyObservers(ExecutionState.CANCELED, null);
-				notifyExecutionStateChange(ExecutionState.CANCELED, null);
+				unregisterTask();
 				return;
 			}
 		}
@@ -339,6 +352,10 @@ public class Task {
 		}
 	}
 
+	protected void unregisterTask() {
+		taskManager.tell(new UnregisterTask(executionId), ActorRef.noSender());
+	}
+
 	protected void notifyExecutionStateChange(ExecutionState executionState,
 											Throwable optionalError) {
 		LOG.info("Update execution state of {} ({}) to {}.", this.getTaskName(),
@@ -377,11 +394,11 @@ public class Task {
 		return environment != null ? environment.getAllInputGates() : null;
 	}
 
-	public BufferWriter[] getWriters() {
+	public ResultPartitionWriter[] getWriters() {
 		return environment != null ? environment.getAllWriters() : null;
 	}
 
-	public IntermediateResultPartition[] getProducedPartitions() {
+	public ResultPartition[] getProducedPartitions() {
 		return environment != null ? environment.getProducedPartitions() : null;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
index ee5f281..3385ace 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
@@ -19,49 +19,61 @@
 package org.apache.flink.runtime.util;
 
 /**
- * Atomic reference counter, which enters a "disposed" state after the reference
- * count reaches 0.
+ * Atomic reference counter, which enters a "disposed" state after it reaches a configurable
+ * reference count (default 0).
  */
 public class AtomicDisposableReferenceCounter {
 
 	private final Object lock = new Object();
 
-	private int referenceCounter;
+	private int referenceCount;
 
 	private boolean isDisposed;
 
+	/** Enter the disposed state when the reference count reaches this number. */
+	private final int disposeOnReferenceCount;
+
+	public AtomicDisposableReferenceCounter() {
+		this.disposeOnReferenceCount = 0;
+	}
+
+	public AtomicDisposableReferenceCounter(int disposeOnReferenceCount) {
+		this.disposeOnReferenceCount = disposeOnReferenceCount;
+	}
+
 	/**
 	 * Increments the reference count and returns whether it was successful.
 	 * <p>
-	 * If the method returns <code>false</code>, the counter has already been
-	 * disposed. Otherwise it returns <code>true</code>.
+	 * If the method returns <code>false</code>, the counter has already been disposed. Otherwise it
+	 * returns <code>true</code>.
 	 */
-	public boolean incrementReferenceCounter() {
+	public boolean increment() {
 		synchronized (lock) {
 			if (isDisposed) {
 				return false;
 			}
 
-			referenceCounter++;
+			referenceCount++;
 			return true;
 		}
 	}
 
 	/**
-	 * Decrements the reference count.
+	 * Decrements the reference count and returns whether the reference counter entered the disposed
+	 * state.
 	 * <p>
-	 * If the method returns <code>true</code>, the decrement operation disposed
-	 * the counter. Otherwise it returns <code>false</code>.
+	 * If the method returns <code>true</code>, the decrement operation disposed the counter.
+	 * Otherwise it returns <code>false</code>.
 	 */
-	public boolean decrementReferenceCounter() {
+	public boolean decrement() {
 		synchronized (lock) {
 			if (isDisposed) {
 				return false;
 			}
 
-			referenceCounter--;
+			referenceCount--;
 
-			if (referenceCounter <= 0) {
+			if (referenceCount <= disposeOnReferenceCount) {
 				isDisposed = true;
 			}
 
@@ -69,9 +81,24 @@ public class AtomicDisposableReferenceCounter {
 		}
 	}
 
+	public int get() {
+		synchronized (lock) {
+			return referenceCount;
+		}
+	}
+
+	/**
+	 * Returns whether the reference count has reached the disposed state.
+	 */
+	public boolean isDisposed() {
+		synchronized (lock) {
+			return isDisposed;
+		}
+	}
+
 	public boolean disposeIfNotUsed() {
 		synchronized (lock) {
-			if(referenceCounter <= 0){
+			if (referenceCount <= disposeOnReferenceCount) {
 				isDisposed = true;
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 98dd7eb..fe66b37 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{ScheduleMode,JobGraph,JobStatus,JobID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobID}
 import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -97,7 +97,7 @@ class JobManager(val configuration: Configuration,
                  val delayBetweenRetries: Long,
                  val timeout: FiniteDuration)
   extends Actor with ActorLogMessages with ActorLogging {
-  
+
   /** Reference to the log, for debugging */
   val LOG = JobManager.LOG
 
@@ -283,20 +283,20 @@ class JobManager(val configuration: Configuration,
           if(newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp
 
-            // is the client waiting for the job result?
+          // is the client waiting for the job result?
             newJobStatus match {
               case JobStatus.FINISHED =>
                 val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID)
-                jobInfo.client ! JobResultSuccess(jobID,jobInfo.duration,accumulatorResults)
+                jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, accumulatorResults)
               case JobStatus.CANCELED =>
                 jobInfo.client ! Failure(new JobCancellationException(jobID,
-                  "Job was cancelled.",error))
+                  "Job was cancelled.", error))
               case JobStatus.FAILED =>
                 jobInfo.client ! Failure(new JobExecutionException(jobID,
-                  "Job execution failed.",error))
+                  "Job execution failed.", error))
               case x =>
-                val exception = new JobExecutionException(jobID,s"$x is not a " +
-                        "terminal state.")
+                val exception = new JobExecutionException(jobID, s"$x is not a " +
+                  "terminal state.")
                 jobInfo.client ! Failure(exception)
                 throw exception
             }
@@ -321,11 +321,11 @@ class JobManager(val configuration: Configuration,
         case None =>
       }
       
-    case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
+case ScheduleOrUpdateConsumers(jobId, partitionId) =>
       currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
           sender ! Acknowledge
-          executionGraph.scheduleOrUpdateConsumers(executionId, partitionIndex)
+          executionGraph.scheduleOrUpdateConsumers(partitionId)
         case None =>
           log.error("Cannot find execution graph for job ID {} to schedule or update consumers",
             jobId)

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 4630089..17e9138 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorEvent
 import org.apache.flink.runtime.client.JobStatusMessage
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{InstanceID, Instance}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobID, JobStatus, JobVertexID}
 import org.apache.flink.runtime.taskmanager.TaskExecutionState
 
@@ -76,18 +77,16 @@ object JobManagerMessages {
    * <p>
    * There is a call to this method for each
    * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]] instance once per produced
-   * [[org.apache.flink.runtime.io.network.partition.IntermediateResultPartition]] instance,
+   * [[org.apache.flink.runtime.io.network.partition.ResultPartition]] instance,
    * either when first producing data (for pipelined executions) or when all data has been produced
    * (for staged executions).
    * <p>
    * The [[org.apache.flink.runtime.jobmanager.JobManager]] then can decide when to schedule the
    * partition consumers of the given session.
    *
-   * @see [[org.apache.flink.runtime.io.network.partition.IntermediateResultPartition]]
+   * @see [[org.apache.flink.runtime.io.network.partition.ResultPartition]]
    */
-  case class ScheduleOrUpdateConsumers(jobId: JobID,
-                                       executionId: ExecutionAttemptID,
-                                       partitionIndex: Int)
+  case class ScheduleOrUpdateConsumers(jobId: JobID, partitionId: ResultPartitionID)
 
   case class ConsumerNotificationResult(success: Boolean, error: Option[Throwable] = None)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index b7bb060..4e6144a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.messages
 
 import org.apache.flink.core.io.InputSplit
-import org.apache.flink.runtime.deployment.{PartitionInfo, TaskDeploymentDescriptor}
+import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.instance.InstanceID
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
@@ -65,17 +65,21 @@ object TaskManagerMessages {
     def executionID: ExecutionAttemptID
   }
 
-  case class UpdateTaskSinglePartitionInfo(executionID: ExecutionAttemptID,
-                        resultId: IntermediateDataSetID,
-                        partitionInfo: PartitionInfo) extends UpdateTask
-
-  case class UpdateTaskMultiplePartitionInfos(executionID: ExecutionAttemptID,
-                                              partitionInfos: Seq[(IntermediateDataSetID,
-                                                PartitionInfo)]) extends UpdateTask
-
-  def createUpdateTaskMultiplePartitionInfos(executionID: ExecutionAttemptID,
-                                             resultIDs: java.util.List[IntermediateDataSetID],
-                                             partitionInfos: java.util.List[PartitionInfo]):
+  case class UpdateTaskSinglePartitionInfo(
+    executionID: ExecutionAttemptID,
+    resultId: IntermediateDataSetID,
+    partitionInfo: InputChannelDeploymentDescriptor)
+    extends UpdateTask
+
+  case class UpdateTaskMultiplePartitionInfos(
+    executionID: ExecutionAttemptID,
+    partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
+    extends UpdateTask
+
+  def createUpdateTaskMultiplePartitionInfos(
+    executionID: ExecutionAttemptID,
+    resultIDs: java.util.List[IntermediateDataSetID],
+    partitionInfos: java.util.List[InputChannelDeploymentDescriptor]):
   UpdateTaskMultiplePartitionInfos = {
     require(resultIDs.size() == partitionInfos.size(), "ResultIDs must have the same length as" +
       "partitionInfos.")

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index aeda6c4..f99aac0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.taskmanager
 
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(numNetworkBuffers: Int,
                                            networkBufferSize: Int,
+                                           ioMode: IOMode,
                                            nettyConfig: Option[NettyConfig] = None)

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 53c45ce..61cab6b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -33,13 +33,14 @@ import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.BlobCache
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
-import org.apache.flink.runtime.deployment.{PartitionInfo, TaskDeploymentDescriptor}
+import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.execution.{CancelTaskException, ExecutionState, RuntimeEnvironment}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
+import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
@@ -261,7 +262,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
       }
 
     case UnregisterTask(executionID) =>
-      unregisterTask(executionID)
+      unregisterTaskAndNotifyFinalState(executionID)
 
     case updateMsg:UpdateTaskExecutionState =>
       currentJobManager foreach {
@@ -348,7 +349,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
     case FailIntermediateResultPartitions(executionID) =>
       log.info("Fail intermediate result partitions associated with execution {}.", executionID)
       networkEnvironment foreach {
-        _.getPartitionManager.failIntermediateResultPartitions(executionID)
+        _.getPartitionManager.releasePartitionsProducedBy(executionID)
       }
 
     case BarrierReq(attemptID, checkpointID) =>
@@ -549,8 +550,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
     registrationDuration = 0 seconds
   }
 
-  private def updateTask(executionId: ExecutionAttemptID,
-                         partitionInfos: Seq[(IntermediateDataSetID, PartitionInfo)]): Unit = {
+  private def updateTask(
+    executionId: ExecutionAttemptID,
+    partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]): Unit = {
 
     runningTasks.get(executionId) match {
       case Some(task) =>
@@ -685,17 +687,24 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
 
       for (t <- runningTasks.values) {
         t.failExternally(cause)
-        unregisterTask(t.getExecutionId)
+        unregisterTaskAndNotifyFinalState(t.getExecutionId)
       }
     }
   }
 
-  private def unregisterTask(executionID: ExecutionAttemptID): Unit = {
+  private def unregisterTaskAndNotifyFinalState(executionID: ExecutionAttemptID): Unit = {
     runningTasks.remove(executionID) match {
       case Some(task) =>
         log.info("Unregister task with execution ID {}.", executionID)
         removeAllTaskResources(task)
         libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) }
+
+        log.info("Updating FINAL execution state of {} ({}) to {}.", task.getTaskName,
+          task.getExecutionId, task.getExecutionState);
+
+        self ! UpdateTaskExecutionState(new TaskExecutionState(
+          task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
+
       case None =>
         if (log.isDebugEnabled) {
           log.debug("Cannot find task with ID {} to unregister.", executionID)
@@ -1194,7 +1203,19 @@ object TaskManager {
         connectionInfo.address(), connectionInfo.dataPort(), pageSize, configuration))
     }
 
-    val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, nettyConfig)
+    // Default spill I/O mode for intermediate results
+    val syncOrAsync = configuration.getString(ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE)
+
+    val ioMode : IOMode = if (syncOrAsync == "async") {
+      IOMode.ASYNC
+    }
+    else {
+      IOMode.SYNC
+    }
+
+    val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, ioMode,
+      nettyConfig)
 
     val networkBufferMem = numNetworkBuffers * pageSize
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 1a274d0..5742fea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -48,8 +48,8 @@ public class TaskDeploymentDescriptorTest {
 			final Configuration jobConfiguration = new Configuration();
 			final Configuration taskConfiguration = new Configuration();
 			final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
-			final List<PartitionDeploymentDescriptor> producedResults = new ArrayList<PartitionDeploymentDescriptor>(0);
-			final List<PartitionConsumerDeploymentDescriptor> inputGates = new ArrayList<PartitionConsumerDeploymentDescriptor>(0);
+			final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
+			final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
 			final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);
 	
 			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName,
@@ -70,7 +70,7 @@ public class TaskDeploymentDescriptorTest {
 			assertEquals(orig.getIndexInSubtaskGroup(), copy.getIndexInSubtaskGroup());
 			assertEquals(orig.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
 			assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
-			assertEquals(orig.getConsumedPartitions(), copy.getConsumedPartitions());
+			assertEquals(orig.getInputGates(), copy.getInputGates());
 
 			assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 526ba7f..c979c42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -38,8 +38,8 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
@@ -138,15 +138,15 @@ public class ExecutionGraphDeploymentTest {
 			assertEquals(RegularPactTask.class.getName(), descr.getInvokableClassName());
 			assertEquals("v2", descr.getTaskName());
 
-			List<PartitionDeploymentDescriptor> producedPartitions = descr.getProducedPartitions();
-			List<PartitionConsumerDeploymentDescriptor> consumedPartitions = descr.getConsumedPartitions();
+			List<ResultPartitionDeploymentDescriptor> producedPartitions = descr.getProducedPartitions();
+			List<InputGateDeploymentDescriptor> consumedPartitions = descr.getInputGates();
 
 			assertEquals(2, producedPartitions.size());
 			assertEquals(1, consumedPartitions.size());
 
-			assertEquals(10, producedPartitions.get(0).getNumberOfQueues());
-			assertEquals(10, producedPartitions.get(1).getNumberOfQueues());
-			assertEquals(10, consumedPartitions.get(0).getPartitions().length);
+			assertEquals(10, producedPartitions.get(0).getNumberOfSubpartitions());
+			assertEquals(10, producedPartitions.get(1).getNumberOfSubpartitions());
+			assertEquals(10, consumedPartitions.get(0).getInputChannelDeploymentDescriptors().length);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index c05fcca..0462b3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -107,7 +107,7 @@ public class ChannelViewsTest
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+		final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
@@ -120,7 +120,7 @@ public class ChannelViewsTest
 		
 		// create the reader input view
 		memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+		final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
 		final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
 		generator.reset();
 		
@@ -151,7 +151,7 @@ public class ChannelViewsTest
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+		final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
@@ -164,7 +164,7 @@ public class ChannelViewsTest
 		
 		// create the reader input view
 		memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+		final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
 		final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
 		generator.reset();
 		
@@ -192,7 +192,7 @@ public class ChannelViewsTest
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+		final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 
 		// write a number of pairs
@@ -205,7 +205,7 @@ public class ChannelViewsTest
 
 		// create the reader input view
 		memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+		final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
 		final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
 		generator.reset();
 
@@ -243,7 +243,7 @@ public class ChannelViewsTest
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+		final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
@@ -256,7 +256,7 @@ public class ChannelViewsTest
 		
 		// create the reader input view
 		memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+		final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
 		final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, true);
 		generator.reset();
 		
@@ -287,7 +287,7 @@ public class ChannelViewsTest
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, 1);
-		final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+		final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
@@ -300,7 +300,7 @@ public class ChannelViewsTest
 		
 		// create the reader input view
 		memory = this.memoryManager.allocatePages(this.parentTask, 1);
-		final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+		final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
 		final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
 		generator.reset();
 		
@@ -331,7 +331,7 @@ public class ChannelViewsTest
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+		final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
@@ -344,7 +344,7 @@ public class ChannelViewsTest
 		
 		// create the reader input view
 		memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
-		final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+		final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
 		final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
 		generator.reset();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index 27928a9..85d2113 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -88,7 +88,7 @@ public class FileChannelStreamsITCase {
 			final FileIOChannel.ID channel = ioManager.createChannel();
 			
 			// create the writer output view
-			final BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel);
+			final BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
 			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
 			
 			// write a number of pairs
@@ -102,7 +102,7 @@ public class FileChannelStreamsITCase {
 			// create the reader input view
 			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
 			
-			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
 			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
 			generator.reset();
 			
@@ -132,7 +132,7 @@ public class FileChannelStreamsITCase {
 			final FileIOChannel.ID channel = this.ioManager.createChannel();
 			
 			// create the writer output view
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+			final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
 			
 			// write a number of pairs
@@ -146,7 +146,7 @@ public class FileChannelStreamsITCase {
 			// create the reader input view
 			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
 			
-			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
 			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
 			generator.reset();
 			
@@ -176,7 +176,7 @@ public class FileChannelStreamsITCase {
 			final FileIOChannel.ID channel = this.ioManager.createChannel();
 			
 			// create the writer output view
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+			final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
 	
 			// write a number of pairs
@@ -190,7 +190,7 @@ public class FileChannelStreamsITCase {
 			// create the reader input view
 			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
 			
-			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
 			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
 			generator.reset();
 	
@@ -226,7 +226,7 @@ public class FileChannelStreamsITCase {
 			final FileIOChannel.ID channel = this.ioManager.createChannel();
 			
 			// create the writer output view
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+			final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
 			
 			// write a number of pairs
@@ -240,7 +240,7 @@ public class FileChannelStreamsITCase {
 			// create the reader input view
 			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), 1);
 			
-			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
 			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
 			generator.reset();
 			
@@ -270,7 +270,7 @@ public class FileChannelStreamsITCase {
 			final FileIOChannel.ID channel = this.ioManager.createChannel();
 			
 			// create the writer output view
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+			final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
 			
 			// write a number of pairs
@@ -284,7 +284,7 @@ public class FileChannelStreamsITCase {
 			// create the reader input view
 			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
 			
-			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
 			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
 			generator.reset();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index 1db2a6f..1f6899d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -49,7 +49,7 @@ public class FileChannelStreamsTest {
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			
 			FileIOChannel.ID channel = ioManager.createChannel();
-			BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel);
+			BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
 			
 			FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
 			new StringValue("Some test text").write(out);
@@ -91,7 +91,7 @@ public class FileChannelStreamsTest {
 				wrt.close();
 			}
 			
-			BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
 			FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9);
 			
 			// read just something

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
index 7e4d70d..f090ef1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -50,7 +50,7 @@ public class SeekableFileChannelInputViewTest {
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			
 			FileIOChannel.ID channel = ioManager.createChannel();
-			BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel);
+			BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
 			FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
 			
 			// write some integers across 7.5 pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
new file mode 100644
index 0000000..0397de5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestNotificationListener;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class AsynchronousBufferFileWriterTest {
+
+	private static final IOManager ioManager = new IOManagerAsync();
+
+	private static final Buffer mockBuffer = mock(Buffer.class);
+
+	private AsynchronousBufferFileWriter writer;
+
+	@Before
+	public void setUp() throws IOException {
+		writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue<WriteRequest>());
+	}
+
+	@Test
+	public void testAddAndHandleRequest() throws Exception {
+		addRequest();
+		assertEquals("Didn't increment number of outstanding requests.", 1, writer.getNumberOfOutstandingRequests());
+
+		handleRequest();
+		assertEquals("Didn't decrement number of outstanding requests.", 0, writer.getNumberOfOutstandingRequests());
+	}
+
+	@Test
+	public void testSubscribe() throws Exception {
+		final TestNotificationListener listener = new TestNotificationListener();
+
+		// Unsuccessful subscription, because no outstanding requests
+		assertFalse("Allowed to subscribe w/o any outstanding requests.", writer.registerAllRequestsProcessedListener(listener));
+
+		// Successful subscription
+		addRequest();
+		assertTrue("Didn't allow to subscribe.", writer.registerAllRequestsProcessedListener(listener));
+
+		// Test notification
+		handleRequest();
+
+		assertEquals("Listener was not notified.", 1, listener.getNumberOfNotifications());
+	}
+
+	@Test
+	public void testSubscribeAndClose() throws IOException, InterruptedException {
+		final TestNotificationListener listener = new TestNotificationListener();
+
+		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+		final CountDownLatch sync = new CountDownLatch(1);
+
+		addRequest();
+		addRequest();
+
+		writer.registerAllRequestsProcessedListener(listener);
+
+		final Thread asyncCloseThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					writer.close();
+				}
+				catch (Throwable t) {
+					error.set(t);
+				}
+				finally {
+					sync.countDown();
+				}
+			}
+		});
+
+		asyncCloseThread.start();
+
+		handleRequest();
+		handleRequest();
+
+		sync.await();
+
+		assertEquals("Listener was not notified.", 1, listener.getNumberOfNotifications());
+	}
+
+	@Test
+	public void testConcurrentSubscribeAndHandleRequest() throws Exception {
+		final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+		final TestNotificationListener listener = new TestNotificationListener();
+
+		final Callable<Boolean> subscriber = new Callable<Boolean>() {
+			@Override
+			public Boolean call() throws Exception {
+				return writer.registerAllRequestsProcessedListener(listener);
+			}
+		};
+
+		final Callable<Void> requestHandler = new Callable<Void>() {
+			@Override
+			public Void call() throws Exception {
+				handleRequest();
+				return null;
+			}
+		};
+
+		try {
+			// Repeat this to provoke races
+			for (int i = 0; i < 50000; i++) {
+				listener.reset();
+
+				addRequest();
+
+				Future<Void> handleRequestFuture = executor.submit(requestHandler);
+				Future<Boolean> subscribeFuture = executor.submit(subscriber);
+
+				handleRequestFuture.get();
+
+				try {
+					if (subscribeFuture.get()) {
+						assertEquals("Race: Successfully subscribed, but was never notified.", 1, listener.getNumberOfNotifications());
+					}
+					else {
+						assertEquals("Race: Never subscribed successfully, but was notified.", 0, listener.getNumberOfNotifications());
+					}
+				}
+				catch (Throwable t) {
+					System.out.println(i);
+					Assert.fail(t.getMessage());
+				}
+			}
+		}
+		finally {
+			executor.shutdownNow();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void addRequest() throws IOException {
+		writer.writeBlock(mockBuffer);
+	}
+
+	private void handleRequest() {
+		writer.handleProcessedBuffer(mockBuffer, null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
new file mode 100644
index 0000000..49e93c6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestNotificationListener;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+@RunWith(PowerMockRunner.class)
+public class AsynchronousFileIOChannelTest {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AsynchronousFileIOChannelTest.class);
+
+	@Test
+	public void testAllRequestsProcessedListenerNotification() throws Exception {
+		// -- Config ----------------------------------------------------------
+		final int numberOfRuns = 10;
+		final int numberOfRequests = 100;
+
+		// -- Setup -----------------------------------------------------------
+		final IOManagerAsync ioManager = new IOManagerAsync();
+
+		final ExecutorService executor = Executors.newFixedThreadPool(3);
+
+		final Random random = new Random();
+
+		final RequestQueue<WriteRequest> requestQueue = new RequestQueue<WriteRequest>();
+
+		final RequestDoneCallback<Buffer> ioChannelCallback = mock(RequestDoneCallback.class);
+
+		final TestNotificationListener listener = new TestNotificationListener();
+
+		// -- The Test --------------------------------------------------------
+		try {
+			// Repeatedly add requests and process them and have one thread try to register as a
+			// listener until the channel is closed and all requests are processed.
+
+			for (int run = 0; run < numberOfRuns; run++) {
+				final TestAsyncFileIOChannel ioChannel = new TestAsyncFileIOChannel(
+						ioManager.createChannel(), requestQueue, ioChannelCallback, true);
+
+				final CountDownLatch sync = new CountDownLatch(3);
+
+				// The mock requests
+				final Buffer buffer = mock(Buffer.class);
+				final WriteRequest request = mock(WriteRequest.class);
+
+				// Add requests task
+				Callable<Void> addRequestsTask = new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						for (int i = 0; i < numberOfRuns; i++) {
+							LOG.debug("Starting run {}.", i + 1);
+
+							for (int j = 0; j < numberOfRequests; j++) {
+								ioChannel.addRequest(request);
+							}
+
+							LOG.debug("Added all ({}) requests of run {}.", numberOfRequests, i + 1);
+
+							int sleep = random.nextInt(10);
+							LOG.debug("Sleeping for {} ms before next run.", sleep);
+
+							Thread.sleep(sleep);
+						}
+
+						LOG.debug("Done. Closing channel.");
+						ioChannel.close();
+
+						sync.countDown();
+
+						return null;
+					}
+				};
+
+				// Process requests task
+				Callable<Void> processRequestsTask = new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						int total = numberOfRequests * numberOfRuns;
+						for (int i = 0; i < total; i++) {
+							requestQueue.take();
+
+							ioChannel.handleProcessedBuffer(buffer, null);
+						}
+
+						LOG.debug("Processed all ({}) requests.", numberOfRequests);
+
+						sync.countDown();
+
+						return null;
+					}
+				};
+
+				// Listener
+				Callable<Void> registerListenerTask = new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						while (true) {
+							int current = listener.getNumberOfNotifications();
+
+							if (ioChannel.registerAllRequestsProcessedListener(listener)) {
+								listener.waitForNotification(current);
+							}
+							else if (ioChannel.isClosed()) {
+								break;
+							}
+						}
+
+						LOG.debug("Stopping listener. Channel closed.");
+
+						sync.countDown();
+
+						return null;
+					}
+				};
+
+				// Run tasks in random order
+				final List<Callable<?>> tasks = new LinkedList<Callable<?>>();
+				tasks.add(addRequestsTask);
+				tasks.add(processRequestsTask);
+				tasks.add(registerListenerTask);
+
+				Collections.shuffle(tasks);
+
+				for (Callable<?> task : tasks) {
+					executor.submit(task);
+				}
+
+				if (!sync.await(2, TimeUnit.MINUTES)) {
+					fail("Test failed due to a timeout. This indicates a deadlock due to the way" +
+							"that listeners are registered/notified in the asynchronous file I/O" +
+							"channel.");
+				}
+
+				listener.reset();
+			}
+		}
+		finally {
+			ioManager.shutdown();
+			executor.shutdown();
+		}
+	}
+
+	@Test
+	public void testClosedButAddRequestAndRegisterListenerRace() throws Exception {
+		// -- Config ----------------------------------------------------------
+		final int numberOfRuns = 1024;
+
+		// -- Setup -----------------------------------------------------------
+		final IOManagerAsync ioManager = new IOManagerAsync();
+
+		final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+		final RequestQueue<WriteRequest> requestQueue = new RequestQueue<WriteRequest>();
+
+		@SuppressWarnings("unchecked")
+		final RequestDoneCallback<Buffer> ioChannelCallback = mock(RequestDoneCallback.class);
+
+		final TestNotificationListener listener = new TestNotificationListener();
+
+		// -- The Test --------------------------------------------------------
+		try {
+			// Repeatedly close the channel and add a request.
+			for (int i = 0; i < numberOfRuns; i++) {
+				final TestAsyncFileIOChannel ioChannel = new TestAsyncFileIOChannel(
+						ioManager.createChannel(), requestQueue, ioChannelCallback, true);
+
+				final CountDownLatch sync = new CountDownLatch(2);
+
+				final WriteRequest request = mock(WriteRequest.class);
+
+				ioChannel.close();
+
+				// Add request task
+				Callable<Void> addRequestTask = new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						try {
+							ioChannel.addRequest(request);
+						}
+						catch (Throwable expected) {
+						}
+						finally {
+							sync.countDown();
+						}
+
+						return null;
+					}
+				};
+
+				// Listener
+				Callable<Void> registerListenerTask = new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						try {
+							while (true) {
+								int current = listener.getNumberOfNotifications();
+
+								if (ioChannel.registerAllRequestsProcessedListener(listener)) {
+									listener.waitForNotification(current);
+								}
+								else if (ioChannel.isClosed()) {
+									break;
+								}
+							}
+						}
+						finally {
+							sync.countDown();
+						}
+
+						return null;
+					}
+				};
+
+				executor.submit(addRequestTask);
+				executor.submit(registerListenerTask);
+
+				if (!sync.await(2, TimeUnit.MINUTES)) {
+					fail("Test failed due to a timeout. This indicates a deadlock due to the way" +
+							"that listeners are registered/notified in the asynchronous file I/O" +
+							"channel.");
+				}
+			}
+		}
+		finally {
+			ioManager.shutdown();
+			executor.shutdown();
+		}
+	}
+
+	@Test
+	public void testClosingWaits() {
+		IOManagerAsync ioMan = new IOManagerAsync();
+		try {
+
+			final int NUM_BLOCKS = 100;
+			final MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+
+			final AtomicInteger callbackCounter = new AtomicInteger();
+			final AtomicBoolean exceptionOccurred = new AtomicBoolean();
+
+			final RequestDoneCallback<MemorySegment> callback = new RequestDoneCallback<MemorySegment>() {
+
+				@Override
+				public void requestSuccessful(MemorySegment buffer) {
+					// we do the non safe variant. the callbacks should come in order from
+					// the same thread, so it should always work
+					callbackCounter.set(callbackCounter.get() + 1);
+
+					if (buffer != seg) {
+						exceptionOccurred.set(true);
+					}
+				}
+
+				@Override
+				public void requestFailed(MemorySegment buffer, IOException e) {
+					exceptionOccurred.set(true);
+				}
+			};
+
+			BlockChannelWriterWithCallback<MemorySegment> writer = ioMan.createBlockChannelWriter(ioMan.createChannel(), callback);
+			try {
+				for (int i = 0; i < NUM_BLOCKS; i++) {
+					writer.writeBlock(seg);
+				}
+
+				writer.close();
+
+				assertEquals(NUM_BLOCKS, callbackCounter.get());
+				assertFalse(exceptionOccurred.get());
+			}
+			finally {
+				writer.closeAndDelete();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			ioMan.shutdown();
+		}
+	}
+
+	@Test
+	public void testExceptionForwardsToClose() {
+		IOManagerAsync ioMan = new IOManagerAsync();
+		try {
+			testExceptionForwardsToClose(ioMan, 100, 1);
+			testExceptionForwardsToClose(ioMan, 100, 50);
+			testExceptionForwardsToClose(ioMan, 100, 100);
+		} finally {
+			ioMan.shutdown();
+		}
+	}
+
+	private void testExceptionForwardsToClose(IOManagerAsync ioMan, final int numBlocks, final int failingBlock) {
+		try {
+			MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+			FileIOChannel.ID channelId = ioMan.createChannel();
+
+			BlockChannelWriterWithCallback<MemorySegment> writer = new AsynchronousBlockWriterWithCallback(channelId,
+					ioMan.getWriteRequestQueue(channelId), new NoOpCallback()) {
+
+				private int numBlocks;
+
+				@Override
+				public void writeBlock(MemorySegment segment) throws IOException {
+					numBlocks++;
+
+					if (numBlocks == failingBlock) {
+						this.requestsNotReturned.incrementAndGet();
+						this.requestQueue.add(new FailingWriteRequest(this, segment));
+					} else {
+						super.writeBlock(segment);
+					}
+				}
+			};
+
+			try {
+				for (int i = 0; i < numBlocks; i++) {
+					writer.writeBlock(seg);
+				}
+
+				writer.close();
+				fail("did not forward exception");
+			}
+			catch (IOException e) {
+				// expected
+			}
+			finally {
+				try {
+					writer.closeAndDelete();
+				} catch (Throwable t) {}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private static class NoOpCallback implements RequestDoneCallback<MemorySegment> {
+
+		@Override
+		public void requestSuccessful(MemorySegment buffer) {}
+
+		@Override
+		public void requestFailed(MemorySegment buffer, IOException e) {}
+	}
+
+	private static class FailingWriteRequest implements WriteRequest {
+
+		private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
+
+		private final MemorySegment segment;
+
+		protected FailingWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) {
+			this.channel = targetChannel;
+			this.segment = segment;
+		}
+
+		@Override
+		public void write() throws IOException {
+			throw new IOException();
+		}
+
+		@Override
+		public void requestDone(IOException ioex) {
+			this.channel.handleProcessedBuffer(this.segment, ioex);
+		}
+	}
+
+	private static class TestAsyncFileIOChannel extends AsynchronousFileIOChannel<Buffer, WriteRequest> {
+
+		protected TestAsyncFileIOChannel(
+				ID channelID,
+				RequestQueue<WriteRequest> requestQueue,
+				RequestDoneCallback<Buffer> callback,
+				boolean writeEnabled) throws IOException {
+
+			super(channelID, requestQueue, callback, writeEnabled);
+		}
+
+		int getNumberOfOutstandingRequests() {
+			return requestsNotReturned.get();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
deleted file mode 100644
index 0ed6233..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-public class AsynchronousFileIOChannelsTest {
-
-	@Test
-	public void testClosingWaits() {
-		IOManagerAsync ioMan = new IOManagerAsync();
-		try {
-			
-			final int NUM_BLOCKS = 100;
-			final MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
-			
-			final AtomicInteger callbackCounter = new AtomicInteger();
-			final AtomicBoolean exceptionOccurred = new AtomicBoolean();
-			
-			final RequestDoneCallback<MemorySegment> callback = new RequestDoneCallback<MemorySegment>() {
-				
-				@Override
-				public void requestSuccessful(MemorySegment buffer) {
-					// we do the non safe variant. the callbacks should come in order from
-					// the same thread, so it should always work
-					callbackCounter.set(callbackCounter.get() + 1);
-					
-					if (buffer != seg) {
-						exceptionOccurred.set(true);
-					}
-				}
-				
-				@Override
-				public void requestFailed(MemorySegment buffer, IOException e) {
-					exceptionOccurred.set(true);
-				}
-			};
-			
-			BlockChannelWriterWithCallback writer = ioMan.createBlockChannelWriter(ioMan.createChannel(), callback);
-			try {
-				for (int i = 0; i < NUM_BLOCKS; i++) {
-					writer.writeBlock(seg);
-				}
-				
-				writer.close();
-				
-				assertEquals(NUM_BLOCKS, callbackCounter.get());
-				assertFalse(exceptionOccurred.get());
-			}
-			finally {
-				writer.closeAndDelete();
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			ioMan.shutdown();
-		}
-	}
-	
-	@Test
-	public void testExceptionForwardsToClose() {
-		IOManagerAsync ioMan = new IOManagerAsync();
-		try {
-			testExceptionForwardsToClose(ioMan, 100, 1);
-			testExceptionForwardsToClose(ioMan, 100, 50);
-			testExceptionForwardsToClose(ioMan, 100, 100);
-		} finally {
-			ioMan.shutdown();
-		}
-	}
-	
-	private void testExceptionForwardsToClose(IOManagerAsync ioMan, final int numBlocks, final int failingBlock) {
-		try {
-			MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
-			FileIOChannel.ID channelId = ioMan.createChannel();
-			
-			BlockChannelWriterWithCallback writer = new AsynchronousBlockWriterWithCallback(channelId, 
-					ioMan.getWriteRequestQueue(channelId), new NoOpCallback()) {
-				
-				private int numBlocks;
-				
-				@Override
-				public void writeBlock(MemorySegment segment) throws IOException {
-					numBlocks++;
-					
-					if (numBlocks == failingBlock) {
-						this.requestsNotReturned.incrementAndGet();
-						this.requestQueue.add(new FailingWriteRequest(this, segment));
-					} else {
-						super.writeBlock(segment);
-					}
-				}
-			};
-			
-			try {
-				for (int i = 0; i < numBlocks; i++) {
-					writer.writeBlock(seg);
-				}
-				
-				writer.close();
-				fail("did not forward exception");
-			}
-			catch (IOException e) {
-				// expected
-			}
-			finally {
-				try {
-					writer.closeAndDelete();
-				} catch (Throwable t) {}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	private static class NoOpCallback implements RequestDoneCallback<MemorySegment> {
-
-		@Override
-		public void requestSuccessful(MemorySegment buffer) {}
-
-		@Override
-		public void requestFailed(MemorySegment buffer, IOException e) {}
-	}
-	
-	private static class FailingWriteRequest implements WriteRequest {
-		
-		private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
-		
-		private final MemorySegment segment;
-		
-		protected FailingWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) {
-			this.channel = targetChannel;
-			this.segment = segment;
-		}
-
-		@Override
-		public void write() throws IOException {
-			throw new IOException();
-		}
-
-		@Override
-		public void requestDone(IOException ioex) {
-			this.channel.handleProcessedBuffer(this.segment, ioex);
-		}
-	} 
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
new file mode 100644
index 0000000..294a6e6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
+import org.apache.flink.runtime.util.event.NotificationListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class BufferFileWriterFileSegmentReaderTest {
+
+	private static final int BUFFER_SIZE = 32 * 1024;
+
+	private static final BufferRecycler BUFFER_RECYCLER = new DiscardingRecycler();
+
+	private static final Random random = new Random();
+
+	private static final IOManager ioManager = new IOManagerAsync();
+
+	private BufferFileWriter writer;
+
+	private AsynchronousBufferFileSegmentReader reader;
+
+	private LinkedBlockingQueue<FileSegment> returnedFileSegments = new LinkedBlockingQueue<FileSegment>();
+
+	@Before
+	public void setUpWriterAndReader() {
+		final FileIOChannel.ID channel = ioManager.createChannel();
+
+		try {
+			writer = ioManager.createBufferFileWriter(channel);
+			reader = (AsynchronousBufferFileSegmentReader) ioManager.createBufferFileSegmentReader(channel, new QueuingCallback<FileSegment>(returnedFileSegments));
+		}
+		catch (IOException e) {
+			if (writer != null) {
+				writer.deleteChannel();
+			}
+
+			if (reader != null) {
+				reader.deleteChannel();
+			}
+
+			fail("Failed to setup writer and reader.");
+		}
+	}
+
+	@After
+	public void tearDownWriterAndReader() {
+		if (writer != null) {
+			writer.deleteChannel();
+		}
+
+		if (reader != null) {
+			reader.deleteChannel();
+		}
+
+		returnedFileSegments.clear();
+	}
+
+	@Test
+	public void testWriteRead() throws IOException, InterruptedException {
+		int numBuffers = 1024;
+		int currentNumber = 0;
+
+		final int minBufferSize = BUFFER_SIZE / 4;
+
+		// Write buffers filled with ascending numbers...
+		for (int i = 0; i < numBuffers; i++) {
+			final Buffer buffer = createBuffer();
+
+			int size = getNextMultipleOf(getRandomNumberInRange(minBufferSize, BUFFER_SIZE), 4);
+
+			buffer.setSize(size);
+
+			currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+
+			writer.writeBlock(buffer);
+		}
+
+		// Make sure that the writes are finished
+		writer.close();
+
+		// Read buffers back in...
+		for (int i = 0; i < numBuffers; i++) {
+			assertFalse(reader.hasReachedEndOfFile());
+			reader.read();
+		}
+
+		// Wait for all requests to be finished
+		final CountDownLatch sync = new CountDownLatch(1);
+		final NotificationListener listener = new NotificationListener() {
+			@Override
+			public void onNotification() {
+				sync.countDown();
+			}
+		};
+
+		if (reader.registerAllRequestsProcessedListener(listener)) {
+			sync.await();
+		}
+
+		assertTrue(reader.hasReachedEndOfFile());
+
+		// Verify that the content is the same
+		assertEquals("Read less buffers than written.", numBuffers, returnedFileSegments.size());
+
+		currentNumber = 0;
+		FileSegment fileSegment;
+
+		ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+
+		while ((fileSegment = returnedFileSegments.poll()) != null) {
+			buffer.position(0);
+			buffer.limit(fileSegment.getLength());
+
+			fileSegment.getFileChannel().read(buffer, fileSegment.getPosition());
+
+			currentNumber = verifyBufferFilledWithAscendingNumbers(new Buffer(new MemorySegment(buffer.array()), BUFFER_RECYCLER), currentNumber, fileSegment.getLength());
+		}
+
+		reader.close();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private int getRandomNumberInRange(int min, int max) {
+		return random.nextInt((max - min) + 1) + min;
+	}
+
+	private int getNextMultipleOf(int number, int multiple) {
+		final int mod = number % multiple;
+
+		if (mod == 0) {
+			return number;
+		}
+
+		return number + multiple - mod;
+	}
+
+	private Buffer createBuffer() {
+		return new Buffer(new MemorySegment(new byte[BUFFER_SIZE]), BUFFER_RECYCLER);
+	}
+
+	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
+		MemorySegment segment = buffer.getMemorySegment();
+
+		final int size = buffer.getSize();
+
+		for (int i = 0; i < size; i += 4) {
+			segment.putInt(i, currentNumber++);
+		}
+
+		return currentNumber;
+	}
+
+	private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber, int size) {
+		MemorySegment segment = buffer.getMemorySegment();
+
+		for (int i = 0; i < size; i += 4) {
+			if (segment.getInt(i) != currentNumber++) {
+				throw new IllegalStateException("Read unexpected number from buffer.");
+			}
+		}
+
+		return currentNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
new file mode 100644
index 0000000..b0c702a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class BufferFileWriterReaderTest {
+
+	private static final int BUFFER_SIZE = 32 * 1024;
+
+	private static final BufferRecycler BUFFER_RECYCLER = new DiscardingRecycler();
+
+	private static final Random random = new Random();
+
+	private static final IOManager ioManager = new IOManagerAsync();
+
+	private BufferFileWriter writer;
+
+	private BufferFileReader reader;
+
+	private LinkedBlockingQueue<Buffer> returnedBuffers = new LinkedBlockingQueue<Buffer>();
+
+	@Before
+	public void setUpWriterAndReader() {
+		final FileIOChannel.ID channel = ioManager.createChannel();
+
+		try {
+			writer = ioManager.createBufferFileWriter(channel);
+			reader = ioManager.createBufferFileReader(channel, new QueuingCallback<Buffer>(returnedBuffers));
+		}
+		catch (IOException e) {
+			if (writer != null) {
+				writer.deleteChannel();
+			}
+
+			if (reader != null) {
+				reader.deleteChannel();
+			}
+
+			fail("Failed to setup writer and reader.");
+		}
+	}
+
+	@After
+	public void tearDownWriterAndReader() {
+		if (writer != null) {
+			writer.deleteChannel();
+		}
+
+		if (reader != null) {
+			reader.deleteChannel();
+		}
+
+		returnedBuffers.clear();
+	}
+
+	@Test
+	public void testWriteRead() throws IOException {
+		int numBuffers = 1024;
+		int currentNumber = 0;
+
+		final int minBufferSize = BUFFER_SIZE / 4;
+
+		// Write buffers filled with ascending numbers...
+		for (int i = 0; i < numBuffers; i++) {
+			final Buffer buffer = createBuffer();
+
+			int size = getNextMultipleOf(getRandomNumberInRange(minBufferSize, BUFFER_SIZE), 4);
+
+			buffer.setSize(size);
+
+			currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+
+			writer.writeBlock(buffer);
+		}
+
+		// Make sure that the writes are finished
+		writer.close();
+
+		// Read buffers back in...
+		for (int i = 0; i < numBuffers; i++) {
+			assertFalse(reader.hasReachedEndOfFile());
+			reader.readInto(createBuffer());
+		}
+
+		reader.close();
+
+		assertTrue(reader.hasReachedEndOfFile());
+
+		// Verify that the content is the same
+		assertEquals("Read less buffers than written.", numBuffers, returnedBuffers.size());
+
+		currentNumber = 0;
+		Buffer buffer;
+
+		while ((buffer = returnedBuffers.poll()) != null) {
+			currentNumber = verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
+		}
+	}
+
+	@Test
+	public void testWriteSkipRead() throws IOException {
+		int numBuffers = 1024;
+		int currentNumber = 0;
+
+		final int minBufferSize = BUFFER_SIZE / 4;
+
+		// Write buffers filled with ascending numbers...
+		for (int i = 0; i < numBuffers; i++) {
+			final Buffer buffer = createBuffer();
+
+			currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+
+			writer.writeBlock(buffer);
+		}
+
+		// Make sure that the writes are finished
+		writer.close();
+
+		final int toSkip = 32;
+
+		// Skip first buffers...
+		reader.seekToPosition((8 + BUFFER_SIZE) * toSkip);
+
+		numBuffers -= toSkip;
+
+		// Read buffers back in...
+		for (int i = 0; i < numBuffers; i++) {
+			assertFalse(reader.hasReachedEndOfFile());
+			reader.readInto(createBuffer());
+		}
+
+		reader.close();
+
+		assertTrue(reader.hasReachedEndOfFile());
+
+		// Verify that the content is the same
+		assertEquals("Read less buffers than written.", numBuffers, returnedBuffers.size());
+
+		// Start number after skipped buffers...
+		currentNumber = (BUFFER_SIZE / 4) * toSkip;
+
+		Buffer buffer;
+		while ((buffer = returnedBuffers.poll()) != null) {
+			currentNumber = verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private int getRandomNumberInRange(int min, int max) {
+		return random.nextInt((max - min) + 1) + min;
+	}
+
+	private int getNextMultipleOf(int number, int multiple) {
+		final int mod = number % multiple;
+
+		if (mod == 0) {
+			return number;
+		}
+
+		return number + multiple - mod;
+	}
+
+	private Buffer createBuffer() {
+		return new Buffer(new MemorySegment(new byte[BUFFER_SIZE]), BUFFER_RECYCLER);
+	}
+
+	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
+		MemorySegment segment = buffer.getMemorySegment();
+
+		final int size = buffer.getSize();
+
+		for (int i = 0; i < size; i += 4) {
+			segment.putInt(i, currentNumber++);
+		}
+
+		return currentNumber;
+	}
+
+	private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber) {
+		MemorySegment segment = buffer.getMemorySegment();
+
+		final int size = buffer.getSize();
+
+		for (int i = 0; i < size; i += 4) {
+			if (segment.getInt(i) != currentNumber++) {
+				throw new IllegalStateException("Read unexpected number from buffer.");
+			}
+		}
+
+		return currentNumber;
+	}
+}